Here's an example Haskell FRP program using the reactive-banana library. I'm only just starting to feel my way with Haskell, and especially haven't quite got my head around what FRP means. I'd really appreciate some critique of the code below
{-# LANGUAGE DeriveDataTypeable #-}
module Main where
{-
Example FRP/zeromq app.
The idea is that messages come into a zeromq socket in the form "id state". The state is of each id is tracked until it's complete.
-}
import Control.Monad
import Data.ByteString.Char8 as C (unpack)
import Data.Map as M
import Data.Maybe
import Reactive.Banana
import System.Environment (getArgs)
import System.ZMQ
data Msg = Msg {mid :: String, state :: String}
deriving (Show, Typeable)
type IdMap = Map String String
-- | Deserialize a string to a Maybe Msg
fromString :: String -> Maybe Msg
fromString s =
case words s of
(x:y:[]) -> Just $ Msg x y
_ -> Nothing
-- | Map a message to a partial operation on a map
-- If the 'state' of the message is "complete" the operation is a delete
-- otherwise it's an insert
toMap :: Msg -> IdMap -> IdMap
toMap msg = case msg of
Msg id_ "complete" -> delete id_
_ -> insert (mid msg) (state msg)
main :: IO ()
main = do
(socketHandle,runSocket) <- newAddHandler
args <- getArgs
let sockAddr = case args of
[s] -> s
_ -> "tcp://127.0.0.1:9999"
putStrLn ("Socket: " ++ sockAddr)
network <- compile $ do
recvd <- fromAddHandler socketHandle
let
-- Filter out the Nothings
justs = filterE isJust recvd
-- Accumulate the partially applied toMap operations
counter = accumE M.empty $ (toMap . fromJust <$> justs)
-- Print the contents
reactimate $ fmap print counter
actuate network
-- Get a socket and kick off the eventloop
withContext 1 $ \ctx ->
withSocket ctx Sub $ \sub -> do
connect sub sockAddr
subscribe sub ""
linkSocketHandler sub runSocket
-- | Recieve a message, deserialize it to a 'Msg' and call the action with the message
linkSocketHandler :: Socket a -> (Maybe Msg -> IO ()) -> IO ()
linkSocketHandler s runner = forever $ do
receive s [] >>= runner . fromString . C.unpack
There's a gist here: https://gist.github.com/1099712.
I'd particularly welcome any comments around whether this is a "good" use of accumE, (I'm unclear of this function will traverse the whole event stream each time although I'm guessing not).
Also I'd like to know how one would go about pulling in messages from multiple sockets - at the moment I have one event loop inside a forever. As a concrete example of this how would I add second socket (a REQ/REP pair in zeromq parlance) to query to the current state of the IdMap inside counter?
(Author of reactive-banana speaking.)
Overall, your code looks fine to me. I don't actually understand why you are using reactive-banana in the first place, but you'll have your reasons. That said, if you are looking for something like Node.js, remember that Haskell's leightweight threads make it unnecessary to use an event-based architecture.
Addendum: Basically, functional reactive programming is useful when you have a variety of different inputs, states and output that must work together with just the right timing (think GUIs, animations, audio). In contrast, it's overkill when you are dealing with many essentially independent events; these are best handled with ordinary functions and the occasional state.
Concerning the individual questions:
"I'd particularly welcome any comments around whether this is a "good" use of accumE, (I'm unclear of this function will traverse the whole event stream each time although I'm guessing not)."
Looks fine to me. As you guessed, the accumE
function is indeed real-time; it will only store the current accumulated value.
Judging from your guess, you seem to be thinking that whenever a new event comes in, it will travel through the network like a firefly. While this does happen internally, it is not how you should think about functional reactive programming. Rather, the right picture is this: the result of fromAddHandler
is the complete list of input events as they will happen. In other words, you should think that recvd
contains the ordered list of each and every event from the future. (Of course, in the interest of your own sanity, you shouldn't try to look at them before their time has come. ;-)) The accumE
function simply transforms one list into another by traversing it once.
I will need to make this way of thinking more clear in the documentation.
"Also I'd like to know how one would go about pulling in messages from multiple sockets - at the moment I have on event loop inside a forever. As a concrete example of this how would I add second socket (a REQ/REP pair in zeromq parlance) to query to the current state of the IdMap inside counter?"
If the receive
function does not block, you can simply call it twice on different sockets
linkSocketHandler s1 s2 runner1 runner2 = forever $ do
receive s1 [] >>= runner1 . fromString . C.unpack
receive s2 [] >>= runner2 . fromString . C.unpack
If it does block, you will need to use threads, see also the section Handling Multiple TCP Streams in the book Real World Haskell. (Feel free to ask a new question on this, as it is outside the scope of this one.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With