I am trying to construct a Conduit
that receives as input ByteString
s (of around 1kb per chunk in size) and produces as output concatenated ByteString
s of 512kb chunks.
This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.
I started out trying isolate
, then takeExactlyE
and eventually conduitVector
, but to no avail. Eventually I settled on this:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.
However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!
P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length
which I guess might evaluate the thunk anyway?
Just to elaborate on Michael's answer and comments, I ended up with this conduit:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
My understanding is that vectorBuilder
will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.
From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize
How about this?
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
loop
where
loop = do
lbs <- takeCE chunkSize =$= sinkLazy
unless (null lbs) $ do
yield lbs
loop
main :: IO ()
main =
yieldMany ["hello", "there", "world!"]
$$ chunksOfAtLeast 3
=$ mapM_C print
There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With