I want to process stream of events received via the MQTT. Library which I'm using uses a callback to provide the results. Processing I'm doing depends on the previous state not only the latest event. Also in the future events might be gathered from the other sources.
At the first I decided to compose it into the list which sounds as a good idea. I had the minor issue cause IO prevents lazy evaluation and waiting for infinite stream might be long, but I solved it with interleaving IO.
stream :: IO [Event]
allows me to do the nice stuff like foldl
, foldM
map
, mapM
, etc... Unfortunately with this approach I rather wont be able to combine two streams, cause there is no more locking feature there.
I was diging through many libs, and found STM with TQueue for example. Unfortunately it is not what I exactly want.
I decide to create custom type and make it Foldable
so I will be able to fold it. I failed due to IO.
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
Whih can be used like main = foldStream (\x _ -> print x) () =<< events
Is it possible to implement base some of base classes to work with this stream as with regular List?
The usual trick in these cases is to make the callback write to a queue, and then read from the other end of the queue.
Using a bounded, closeable queue from the stm-chans package, we can define this function:
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
let go state =
do m <- atomically (readTBMQueue queue)
case m of
Nothing -> done state
Just a -> step state a >>= go
in start >>= go
It takes the channel, a step function (similar to the one required by foldM
), an action to obtain the initial state, and a "done" action that returns the final result, and then feeds data from the channel until it is closed. Notice that the fold state x
is chosen by the caller of foldQueue
.
If later we want to upgrade to the monadic folds from the foldl package—which have a very useful Applicative
instance—we can do it like this:
import qualified Control.Foldl as L
foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b
foldQueue' queue = L.impurely (foldQueue queue)
Using impurely
from the "foldl" package.
Sometimes (like when parsing, grouping, or decoding) it's easier to use a pull-based consumer. We can do that with the streaming package:
import Streaming
import qualified Streaming.Prelude as S
foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
m <- atomically (readTBMQueue queue)
return (case m of
Nothing -> Right ()
Just a -> Left a)))
Given a function that consumes a stream, we feed to it a stream of values read from the queue.
Often, reading from the channel and writing to it must happen in different threads. We can use functions like concurrently
from async to handle it cleanly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With