module Queue ( Queue , QueueM , newQueue , push , pull , pullWith ) where import Common (delete) import Control.Concurrent.STM ( TChan , TMVar , TVar , atomically , newEmptyTMVarIO , newTChan , newTVar , readTChan , readTVar , readTVarIO , takeTMVar , tryPutTMVar , writeTChan , writeTVar ) import Control.Monad (when) import Intro type Selector a = (a -> Bool) data Queue a = Queue { inner :: TChan a , selectors :: [(Selector a, TMVar a)] } type QueueM a = TVar (Queue a) newQueue :: IO (QueueM a) newQueue = atomically $ do chan <- newTChan newTVar $ Queue chan [] pullWith :: Selector a -> QueueM a -> IO a pullWith p queueM = do q <- readTVarIO queueM t <- newEmptyTMVarIO atomically $ writeTVar queueM (q{selectors = (p, t) : q.selectors}) atomically $ takeTMVar t push :: QueueM a -> a -> IO () push queueM x = atomically $ do q <- readTVar queueM case delete (\(p, _) -> p x) q.selectors of (Just (_, t), xs) -> do writeTChan q.inner x isPut <- tryPutTMVar t x when isPut $ writeTVar queueM (q{selectors = xs}) (Nothing, _) -> writeTChan q.inner x pull :: QueueM a -> IO a pull queueM = atomically $ do q <- readTVar queueM readTChan q.inner