Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel Haskell. Rate-Limiting the Producer

In Parallel and Concurrent Programming in Haskell, Simon Marlow provides a Stream a based on the following data, together with some producer and consumer:

data IList a
  = Nil
  | Cons a (IVar (IList a))

type Stream a = IVar (IList a)

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
      var <- new
      fork $ loop xs var
      return var
    where
      loop [] var = put var Nil
      loop (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop xs tail

Later, he mentions the drawbacks of this approach and proposes a solution:

In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the IList type:

data IList a
    = Nil
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

However, he doesn't finish this approach:

I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify streamFromList, streamFold, and streamMap to incorporate the Fork constructor. The chunk size and fork distance should be parameters to the producers (streamFromList and streamMap).

The same question has been asked on the mailing list, but nobody gave an answer.

So how could one limit the rate of the producer?

like image 659
fcennecf Avatar asked Mar 20 '23 01:03

fcennecf


1 Answers

The important part lies in the loop function:

  loop [] var = put var Nil
  loop (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop xs tail

We need to add the fork distance f and the chunk size c as parameters:

  loop _ _ [] var = put var Nil
  loop 0 c (x:xs) var = -- see below
  loop f c (x:xs) var = do
    tail <- new
    put var (Cons x tail)
    loop (f-1) c xs tail

The fork distance gets reduced in every iteration. What do we need to do when the fork distance is zero? We provide a Fork op t, where op continues to produce the list:

  loop 0 c (x:xs) var = do
    tail <- new
    let op = loop c xs tail
    put var (Fork op (Cons x tail))

Note that we don't use Fork if the list is empty. That would be possible, but is a little bit silly, after all, there's nothing to be produced left. Changing streamFromList is now simple:

streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
streamFromList f c xs = do
  var <- new                            
  fork $ loop f c xs var                 
  return var 

Now, in order to use it, we need to change the case in streamFold:

streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
streamFold fn acc instrm = acc `seq` do
  ilst <- get instrm
  case ilst of
    Cons h t          -> streamFold fn (fn acc h) t
    Fork p (Cons h t) -> -- see below
    _                 -> return acc

Remember, we didn't allow an empty list in the Fork in our streamFromList, but just in case we're matching it (and Nil) via wildcard.

What do we need to do if we encounter a Fork with data? First of all, we need to use fork to run the Par () operation in order to propagate t, and then we can start to use it. So our last case is

    Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t

streamMap is analogous. Only in this case you use additional parameters on your loop again like in streamFromList.

like image 181
Zeta Avatar answered Mar 25 '23 05:03

Zeta