1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
module Queue
( Queue
, QueueM
, newQueue
, push
, pull
, pullWith
)
where
import Common (delete)
import Control.Concurrent.STM
( TChan
, TMVar
, TVar
, atomically
, newEmptyTMVarIO
, newTChan
, newTVar
, readTChan
, readTVar
, readTVarIO
, takeTMVar
, tryPutTMVar
, writeTChan
, writeTVar
)
import Control.Monad (when)
import Intro
type Selector a = (a -> Bool)
data Queue a = Queue
{ inner :: TChan a
, selectors :: [(Selector a, TMVar a)]
}
type QueueM a = TVar (Queue a)
newQueue :: IO (QueueM a)
newQueue = atomically $ do
chan <- newTChan
newTVar $ Queue chan []
pullWith :: Selector a -> QueueM a -> IO a
pullWith p queueM = do
q <- readTVarIO queueM
t <- newEmptyTMVarIO
atomically $ writeTVar queueM (q{selectors = (p, t) : q.selectors})
atomically $ takeTMVar t
push :: QueueM a -> a -> IO ()
push queueM x = atomically $ do
q <- readTVar queueM
case delete (\(p, _) -> p x) q.selectors of
(Just (_, t), xs) -> do
writeTChan q.inner x
isPut <- tryPutTMVar t x
when isPut $ writeTVar queueM (q{selectors = xs})
(Nothing, _) -> writeTChan q.inner x
pull :: QueueM a -> IO a
pull queueM = atomically $ do
q <- readTVar queueM
readTChan q.inner
|