Last active
November 27, 2024 12:31
-
-
Save lazamar/dc81d60e915e6a3a7ebbbdcaca6904da to your computer and use it in GitHub Desktop.
Thread-safe mysql-simple usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Database.MySQL | |
( Connected | |
, withConnection | |
, connected | |
) where | |
import Control.Concurrent (MVar, newMVar, newEmptyMVar, putMVar, takeMVar, tryPutMVar, modifyMVar_) | |
import Control.Concurrent.Async (withAsyncBound, withAsync, wait, waitEither) | |
import Control.Exception (SomeException, SomeAsyncException, fromException, bracket, throwIO, tryJust, tryJust) | |
import Control.Monad (unless, forever) | |
import System.IO.Unsafe (unsafePerformIO) | |
import qualified Database.MySQL.Base as M (initLibrary, initThread, endThread) | |
import qualified Database.MySQL.Simple as M (Connection, ConnectInfo, connect, close) | |
-- Run an action in the connection thread. | |
newtype Connected = Connected | |
{ withConnection :: forall a. (M.Connection -> IO a) -> IO a | |
} | |
{-# NOINLINE initialised #-} | |
initialised :: MVar Bool | |
initialised = unsafePerformIO (newMVar False) | |
-- | Initialise the C library only once. | |
initialise :: IO () | |
initialise = modifyMVar_ initialised $ \done -> do | |
unless done M.initLibrary | |
return True | |
-- | Thread-safe conenction to MySQL. | |
connected :: M.ConnectInfo -> (Connected -> IO a) -> IO a | |
connected cinfo f = do | |
initialise | |
-- tryPutMVar at the end avoids BlockedIndefinitelyOnMVar. | |
bracket newEmptyMVar (`tryPutMVar` const (return ())) $ \varAction -> do | |
let c = Connected (schedule varAction) | |
withAsyncBoundThrow (runner varAction) (f c) | |
where | |
-- schedule an action to be executed. MVar fairness guarantees FIFO of scheduled actions. | |
schedule :: MVar (M.Connection -> IO ()) -> (forall a. (M.Connection -> IO a) -> IO a) | |
schedule varAction act = do | |
varResult <- newEmptyMVar :: IO (MVar (Either SomeException a)) | |
let action conn = putMVar varResult =<< try_ (act conn) | |
putMVar varAction action | |
r <- takeMVar varResult | |
case r of | |
Right v -> return v | |
Left err -> throwIO err | |
-- run actions in bound thread. | |
runner :: MVar (M.Connection -> IO ()) -> IO () | |
runner varAction = | |
bracket open close $ \conn -> do | |
forever $ do | |
act <- takeMVar varAction | |
act conn | |
open = do | |
M.initThread | |
M.connect cinfo | |
close conn = do | |
M.close conn | |
M.endThread | |
-- | Catch only exceptions thrown by the action itself. | |
try_ :: IO a -> IO (Either SomeException a) | |
try_ = tryJust f | |
where | |
f ex = | |
case fromException ex of | |
Just (_ :: SomeAsyncException) -> Nothing | |
Nothing -> Just ex | |
-- | Version of withAsyncBound which throws if background task throws. | |
withAsyncBoundThrow :: IO a -> IO b -> IO b | |
withAsyncBoundThrow left right = | |
withAsyncBound right $ \r -> | |
withAsync left $ \l -> do | |
e <- waitEither l r | |
case e of | |
Left _ -> wait r | |
Right v -> return v |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I use it as such: