I'm working with the streaming library but would accept an answer using pipes or conduit.
Say I have
import Streaming (Stream, Of)
import qualified Streaming.Prelude as S
streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)
To reduce latency I'd like to fork highLatencyGet to retrieve the next Thing in parallel with processing the previous Thing in the consumer.
Obviously I could transform my function above creating a new MVar and forking the next batch before calling yield, etc.
But I want to know if there is an idiomatic (composable) way to do this, such that it could be packaged in a library and could be used on arbitrary IO Streams. Ideally we could configure the prefetch value as well, like:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
This solution uses pipes but it could be easily adapted to use streaming. To be precise, it requires the pipes, pipes-concurrency and async packages.
It doesn't work in a "direct" style. Instead of simply transforming the Producer, it also takes a "folding function" that consumes a Producer. This continuation-passing style is necessary for setting up and tearing down the concurrency mechanism.
import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)
prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))
The output of the original producer is redirected to a bounded queue. At the same time, we apply the producer-folding function to a producer that reads from the queue.
Whenever each of the concurrent actions completes, we take care to promptly close the channel to avoid leaving the other side hanging.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With