Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Idiomatic prefetching in a streaming library

I'm working with the streaming library but would accept an answer using pipes or conduit.

Say I have

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $ \go thingID ->
    unless (thingID > lastID) $ do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)

To reduce latency I'd like to fork highLatencyGet to retrieve the next Thing in parallel with processing the previous Thing in the consumer.

Obviously I could transform my function above creating a new MVar and forking the next batch before calling yield, etc.

But I want to know if there is an idiomatic (composable) way to do this, such that it could be packaged in a library and could be used on arbitrary IO Streams. Ideally we could configure the prefetch value as well, like:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
like image 872
jberryman Avatar asked Nov 26 '25 03:11

jberryman


1 Answers

This solution uses pipes but it could be easily adapted to use streaming. To be precise, it requires the pipes, pipes-concurrency and async packages.

It doesn't work in a "direct" style. Instead of simply transforming the Producer, it also takes a "folding function" that consumes a Producer. This continuation-passing style is necessary for setting up and tearing down the concurrency mechanism.

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (outbox,inbox,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
        *>
        Concurrently (cutcord (foldfunc (fromInput inbox)))

The output of the original producer is redirected to a bounded queue. At the same time, we apply the producer-folding function to a producer that reads from the queue.

Whenever each of the concurrent actions completes, we take care to promptly close the channel to avoid leaving the other side hanging.

like image 54
danidiaz Avatar answered Nov 28 '25 21:11

danidiaz



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!