Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling event streams in haskell

I want to process stream of events received via the MQTT. Library which I'm using uses a callback to provide the results. Processing I'm doing depends on the previous state not only the latest event. Also in the future events might be gathered from the other sources.

At the first I decided to compose it into the list which sounds as a good idea. I had the minor issue cause IO prevents lazy evaluation and waiting for infinite stream might be long, but I solved it with interleaving IO.

stream :: IO [Event] allows me to do the nice stuff like foldl, foldM map, mapM, etc... Unfortunately with this approach I rather wont be able to combine two streams, cause there is no more locking feature there.

I was diging through many libs, and found STM with TQueue for example. Unfortunately it is not what I exactly want.

I decide to create custom type and make it Foldable so I will be able to fold it. I failed due to IO.

import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined

Whih can be used like main = foldStream (\x _ -> print x) () =<< events

Is it possible to implement base some of base classes to work with this stream as with regular List?

like image 393
majkrzak Avatar asked Aug 06 '19 19:08

majkrzak


1 Answers

The usual trick in these cases is to make the callback write to a queue, and then read from the other end of the queue.

Using a bounded, closeable queue from the stm-chans package, we can define this function:

import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue

foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
    let go state = 
            do m <- atomically (readTBMQueue queue)
               case m of 
                   Nothing -> done state
                   Just a  -> step state a >>= go
     in start >>= go

It takes the channel, a step function (similar to the one required by foldM), an action to obtain the initial state, and a "done" action that returns the final result, and then feeds data from the channel until it is closed. Notice that the fold state x is chosen by the caller of foldQueue.

If later we want to upgrade to the monadic folds from the foldl package—which have a very useful Applicative instance—we can do it like this:

import qualified Control.Foldl as L

foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b 
foldQueue' queue = L.impurely (foldQueue queue)

Using impurely from the "foldl" package.

Sometimes (like when parsing, grouping, or decoding) it's easier to use a pull-based consumer. We can do that with the streaming package:

import Streaming
import qualified Streaming.Prelude as S

foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
    m <- atomically (readTBMQueue queue)
    return (case m of
        Nothing -> Right ()
        Just a -> Left a)))

Given a function that consumes a stream, we feed to it a stream of values read from the queue.

Often, reading from the channel and writing to it must happen in different threads. We can use functions like concurrently from async to handle it cleanly.

like image 54
danidiaz Avatar answered Oct 21 '22 13:10

danidiaz