Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing Event Streams in Haskell using MVars

I want to port the following JavaScript code to Haskell: http://jsfiddle.net/mz68R/

This is what I tried:

import Control.Concurrent
import Data.IORef

type EventStream a = IORef [MVar a]

newEventStream :: IO (EventStream a)
newEventStream = newIORef []

setEvent :: EventStream a -> a -> IO ()
setEvent stream event = readIORef stream >>= mapM_ (`putMVar` event)

getEvent :: EventStream a -> (a -> IO b) -> IO ThreadId
getEvent stream listener = do
    event <- newEmptyMVar
    modifyIORef stream (++ [event])
    forkIO $ loop (takeMVar event >>= listener)

loop :: Monad m => m a -> m ()
loop a = a >> loop a

main = do
    fib <- newEventStream
    getEvent fib $ \(a, b) -> do
        print (a, b)
        setEvent fib (b, a + b)
    setEvent fib (0,1)

It partly works as expected: it doesn't produce an infinite list of Fibonacci numbers. It prints out varying numbers of Fibonacci numbers:

aaditmshah@home:~$ runhaskell EventStream.hs
(0,1)
(1,1)
aaditmshah@home:~$ runhaskell EventStream.hs
(0,1)
(1,1)
(1,2)
(2,3)
(3,5)
aaditmshah@home:~$ runhaskell EventStream.hs
(0,1)
(1,1)
(1,2)
(2,3)
(3,5)
(5,8)
(8,13)
(13,21)
(21,34)
(34,55)
(55,89)
(89,144)
(144,233)
(233,377)
(377,610)
(610,987)
(987,1597)
(1597,2584)
(2584,4181)
(4181,6765)
(6765,10946)

I believe that the problem is due to concurrency in the getEvent function but I can't put my finger on it. How do I refactor my code to alleviate this problem?

like image 866
Aadit M Shah Avatar asked Feb 27 '14 18:02

Aadit M Shah


2 Answers

When you run a Haskell program, it exits as soon as the main thread exits. You have a bit of a race condition: getEvent's child threads are trying to get as much work done before the process exits.

One simple fix is to add an import line of import Control.Monad (forever) and then, at the end of main, run:

forever $ threadDelay maxBound

Which will cause the main thread to sleep forever. Better approaches depend on the purpose of your actual application.

like image 71
Michael Snoyman Avatar answered Nov 16 '22 21:11

Michael Snoyman


Alternative to Michael's answer, you can use the async library which embodies a number of nice concurrency patterns. In particular we have the function

async :: IO a -> IO (Async a)

which runs the input IO action in another thread and immediately returns an Async-wrapped return value. Obviously we cannot get that a out until we've waited long enough for the child process to complete, but the immediate return lets us do something in between the birth of the child process and us waiting on its completion

-- | "Work".
work :: Int -> IO ()
work n = threadDelay (n * 10000)

do ret <- async $ do work 100 -- do some "work"
                     return True
   putStrLn "Not waiting on the child process yet; doing other work"
   work 5
   putStrLn "Now we wait"
   _ <- wait ret

The point here is that you can kick off all the child threads in your main thread using async then have it wait on all of their return values before it's allowed to terminate.

In your case, your children never return, so this means that the main thread will simply stall happily until your program is interrupted.

like image 36
J. Abrahamson Avatar answered Nov 16 '22 23:11

J. Abrahamson