Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Infinite stream of effectful actions

I would like to parse an infinite stream of bytes into an infinite stream of Haskell data. Each byte is read from the network, thus they are wrapped into IO monad.

More concretely I have an infinite stream of type [IO(ByteString)]. On the other hand I have a pure parsing function parse :: [ByteString] -> [Object] (where Object is a Haskell data type)

Is there a way to plug my infinite stream of monad into my parsing function ?

For instance, is it possible to write a function of type [IO(ByteString)] -> IO [ByteString] in order for me to use my function parse in a monad?

like image 642
abitbol Avatar asked Jan 26 '23 12:01

abitbol


1 Answers

The Problem

Generally speaking, in order for IO actions to be properly ordered and behave predictably, each action needs to complete fully before the next action is run. In a do-block, this means that this works:

main = do
    sequence (map putStrLn ["This","action","will","complete"])
    putStrLn "before we get here"

but unfortunately this won't work, if that final IO action was important:

dontRunMe = do
    putStrLn "This is a problem when an action is"
    sequence (repeat (putStrLn "infinite"))
    putStrLn "<not printed>"

So, even though sequence can be specialized to the right type signature:

sequence :: [IO a] -> IO [a]

it doesn't work as expected on an infinite list of IO actions. You'll have no problem defining such a sequence:

badSeq :: IO [Char]
badSeq = sequence (repeat (return '+'))

but any attempt to execute the IO action (e.g., by trying to print the head of the resulting list) will hang:

main = (head <$> badSeq) >>= print

It doesn't matter if you only need a part of the result. You won't get anything out of the IO monad until the entire sequence is done (so "never" if the list is infinite).

The "Lazy IO" Solution

If you want to get data from a partially completed IO action, you need to be explicit about it and make use of a scary-sounding Haskell escape hatch, unsafeInterleaveIO. This function takes an IO action and "defers" it so that it won't actually execute until the value is demanded.

The reason this is unsafe in general is that an IO action that makes sense now, might mean something different if actually executed at a later time point. As a simple example, an IO action that truncates/removes a file has a very different effect if it's executed before versus after updated file contents are written!

Anyway, what you'd want to do here is write a lazy version of sequence:

import System.IO.Unsafe (unsafeInterleaveIO)

lazySequence :: [IO a] -> IO [a]
lazySequence [] = return []  -- oops, not infinite after all
lazySequence (m:ms) = do
  x <- m
  xs <- unsafeInterleaveIO (lazySequence ms)
  return (x:xs)

The key point here is that, when a lazySequence infstream action is executed, it will actually execute only the first action; the remaining actions will be wrapped up in a deferred IO action that won't truly execute until the second and subsequent elements of the returned list are demanded.

This works for fake IO actions:

> take 5 <$> lazySequence (repeat (return ('+'))
"+++++"
>

(where if you replaced lazySequence with sequence, it would hang). It also works for real IO actions:

> lns <- lazySequence (repeat getLine)
<waits for first line of input, then returns to prompt>
> print (head lns)
<prints whatever you entered>
> length (head (tail lns))  -- force next element
<waits for second line of input>
<then shows length of your second line before prompt>
>

Anyway, with this definition of lazySequence and types:

parse :: [ByteString] -> [Object]
input :: [IO ByteString]

you should have no trouble writing:

outputs :: IO [Object]
outputs = parse <$> lazySequence inputs

and then using it lazily however you want:

main = do
    objs <- outputs
    mapM_ doSomethingWithObj objs

Using Conduit

Even though the above lazy IO mechanism is pretty simple and straightforward, lazy IO has fallen out of favor for production code due to issues with resource management, fragility with respect to space leaks (where a small change to your code blows up the memory footprint), and problems with exception handling.

One solution is the conduit library. Another is pipes. Both are carefully designed streaming libraries that can support infinite streams.

For conduit, if you had a parse function that created one object per byte string, like:

parse1 :: ByteString -> Object
parse1 = ...

then given:

inputs :: [IO ByteString]
inputs = ...

useObject :: Object -> IO ()
useObject = ...

the conduit would look something like:

import Conduit

main :: IO ()
main = runConduit $  mapM_ yieldM inputs
                  .| mapC parse1
                  .| mapM_C useObject

Given that your parse function has signature:

parse :: [ByteString] -> [Object]

I'm pretty sure you can't integrate this with conduit directly (or at least not in any way that wouldn't toss out all the benefits of using conduit). You'd need to rewrite it to be conduit friendly in how it consumed byte strings and produced objects.

like image 182
K. A. Buhr Avatar answered Jan 29 '23 10:01

K. A. Buhr