aboutsummaryrefslogtreecommitdiff
path: root/src/Queue.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Queue.hs')
-rw-r--r--src/Queue.hs65
1 files changed, 65 insertions, 0 deletions
diff --git a/src/Queue.hs b/src/Queue.hs
new file mode 100644
index 0000000..56af251
--- /dev/null
+++ b/src/Queue.hs
@@ -0,0 +1,65 @@
+module Queue
+ ( Queue
+ , QueueM
+ , newQueue
+ , push
+ , pull
+ , pullWith
+ )
+where
+
+import Common (delete)
+import Control.Concurrent.STM
+ ( TChan
+ , TMVar
+ , TVar
+ , atomically
+ , newEmptyTMVarIO
+ , newTChan
+ , newTVar
+ , readTChan
+ , readTVar
+ , readTVarIO
+ , takeTMVar
+ , tryPutTMVar
+ , writeTChan
+ , writeTVar
+ )
+import Control.Monad (when)
+import Intro
+
+type Selector a = (a -> Bool)
+
+data Queue a = Queue
+ { inner :: TChan a
+ , selectors :: [(Selector a, TMVar a)]
+ }
+
+type QueueM a = TVar (Queue a)
+
+newQueue :: IO (QueueM a)
+newQueue = atomically $ do
+ chan <- newTChan
+ newTVar $ Queue chan []
+
+pullWith :: Selector a -> QueueM a -> IO a
+pullWith p queueM = do
+ q <- readTVarIO queueM
+ t <- newEmptyTMVarIO
+ atomically $ writeTVar queueM (q{selectors = (p, t) : q.selectors})
+ atomically $ takeTMVar t
+
+push :: QueueM a -> a -> IO ()
+push queueM x = atomically $ do
+ q <- readTVar queueM
+ case delete (\(p, _) -> p x) q.selectors of
+ (Just (_, t), xs) -> do
+ writeTChan q.inner x
+ isPut <- tryPutTMVar t x
+ when isPut $ writeTVar queueM (q{selectors = xs})
+ (Nothing, _) -> writeTChan q.inner x
+
+pull :: QueueM a -> IO a
+pull queueM = atomically $ do
+ q <- readTVar queueM
+ readTChan q.inner