diff options
Diffstat (limited to 'src/Queue.hs')
-rw-r--r-- | src/Queue.hs | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/src/Queue.hs b/src/Queue.hs new file mode 100644 index 0000000..56af251 --- /dev/null +++ b/src/Queue.hs @@ -0,0 +1,65 @@ +module Queue + ( Queue + , QueueM + , newQueue + , push + , pull + , pullWith + ) +where + +import Common (delete) +import Control.Concurrent.STM + ( TChan + , TMVar + , TVar + , atomically + , newEmptyTMVarIO + , newTChan + , newTVar + , readTChan + , readTVar + , readTVarIO + , takeTMVar + , tryPutTMVar + , writeTChan + , writeTVar + ) +import Control.Monad (when) +import Intro + +type Selector a = (a -> Bool) + +data Queue a = Queue + { inner :: TChan a + , selectors :: [(Selector a, TMVar a)] + } + +type QueueM a = TVar (Queue a) + +newQueue :: IO (QueueM a) +newQueue = atomically $ do + chan <- newTChan + newTVar $ Queue chan [] + +pullWith :: Selector a -> QueueM a -> IO a +pullWith p queueM = do + q <- readTVarIO queueM + t <- newEmptyTMVarIO + atomically $ writeTVar queueM (q{selectors = (p, t) : q.selectors}) + atomically $ takeTMVar t + +push :: QueueM a -> a -> IO () +push queueM x = atomically $ do + q <- readTVar queueM + case delete (\(p, _) -> p x) q.selectors of + (Just (_, t), xs) -> do + writeTChan q.inner x + isPut <- tryPutTMVar t x + when isPut $ writeTVar queueM (q{selectors = xs}) + (Nothing, _) -> writeTChan q.inner x + +pull :: QueueM a -> IO a +pull queueM = atomically $ do + q <- readTVar queueM + readTChan q.inner |