aboutsummaryrefslogtreecommitdiff
path: root/src/Queue.hs
blob: 56af2516f264a1ec4106416417c21ae56a24fd20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
module Queue
  ( Queue
  , QueueM
  , newQueue
  , push
  , pull
  , pullWith
  )
where

import Common (delete)
import Control.Concurrent.STM
  ( TChan
  , TMVar
  , TVar
  , atomically
  , newEmptyTMVarIO
  , newTChan
  , newTVar
  , readTChan
  , readTVar
  , readTVarIO
  , takeTMVar
  , tryPutTMVar
  , writeTChan
  , writeTVar
  )
import Control.Monad (when)
import Intro

type Selector a = (a -> Bool)

data Queue a = Queue
  { inner :: TChan a
  , selectors :: [(Selector a, TMVar a)]
  }

type QueueM a = TVar (Queue a)

newQueue :: IO (QueueM a)
newQueue = atomically $ do
  chan <- newTChan
  newTVar $ Queue chan []

pullWith :: Selector a -> QueueM a -> IO a
pullWith p queueM = do
  q <- readTVarIO queueM
  t <- newEmptyTMVarIO
  atomically $ writeTVar queueM (q{selectors = (p, t) : q.selectors})
  atomically $ takeTMVar t

push :: QueueM a -> a -> IO ()
push queueM x = atomically $ do
  q <- readTVar queueM
  case delete (\(p, _) -> p x) q.selectors of
    (Just (_, t), xs) -> do
      writeTChan q.inner x
      isPut <- tryPutTMVar t x
      when isPut $ writeTVar queueM (q{selectors = xs})
    (Nothing, _) -> writeTChan q.inner x

pull :: QueueM a -> IO a
pull queueM = atomically $ do
  q <- readTVar queueM
  readTChan q.inner