In Parallel and Concurrent Programming in Haskell, Simon Marlow provides a Stream a
based on the following data, together with some producer and consumer:
data IList a
= Nil
| Cons a (IVar (IList a))
type Stream a = IVar (IList a)
streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
var <- new
fork $ loop xs var
return var
where
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
Later, he mentions the drawbacks of this approach and proposes a solution:
In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the
IList
type:data IList a = Nil | Cons a (IVar (IList a)) | Fork (Par ()) (IList a)
However, he doesn't finish this approach:
I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify
streamFromList
,streamFold
, andstreamMap
to incorporate theFork
constructor. The chunk size and fork distance should be parameters to the producers (streamFromList
andstreamMap
).
The same question has been asked on the mailing list, but nobody gave an answer.
So how could one limit the rate of the producer?
The important part lies in the loop
function:
loop [] var = put var Nil
loop (x:xs) var = do
tail <- new
put var (Cons x tail)
loop xs tail
We need to add the fork distance f
and the chunk size c
as parameters:
loop _ _ [] var = put var Nil
loop 0 c (x:xs) var = -- see below
loop f c (x:xs) var = do
tail <- new
put var (Cons x tail)
loop (f-1) c xs tail
The fork distance gets reduced in every iteration. What do we need to do when the fork distance is zero? We provide a Fork op t
, where op
continues to produce the list:
loop 0 c (x:xs) var = do
tail <- new
let op = loop c xs tail
put var (Fork op (Cons x tail))
Note that we don't use Fork
if the list is empty. That would be possible, but is a little bit silly, after all, there's nothing to be produced left. Changing streamFromList
is now simple:
streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
var <- new
fork $ loop f c xs var
return var
Now, in order to use it, we need to change the case
in streamFold
:
streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
ilst <- get instrm
case ilst of
Cons h t -> streamFold fn (fn acc h) t
Fork p (Cons h t) -> -- see below
_ -> return acc
Remember, we didn't allow an empty list in the Fork
in our streamFromList
, but just in case we're matching it (and Nil
) via wildcard.
What do we need to do if we encounter a Fork
with data? First of all, we need to use fork
to run the Par ()
operation in order to propagate t
, and then we can start to use it. So our last case is
Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t
streamMap
is analogous. Only in this case you use additional parameters on your loop again like in streamFromList
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With