I'm trying to read a group of up to 50 items from a pipe and process them in an IO action all at once. (The use case for this is I'm trying to insert data into a database and I want to do an entire batch inside one transaction because it is vastly more efficient). Here is a simplified version of what I've got so far:
type ExampleType = Int
doSomething :: [ExampleType] -> IO ()
doSomething = undefined
inGroupsOf50 :: Monad m => Producer ExampleType m () -> m ()
inGroupsOf50 input =
runEffect $ input >-> loop
where loop = do entries <- replicateM 50 await
lift $ doSomething entries --Insert a bunch all in one transaction
loop
The problem is as far as I can tell, unless the number of items to insert happens to divide by 50, I'm going to miss some. What I really want instead of replicateM 50 await
is something that gives me up to 50 items or fewer if the input ends but I can't quite figure out how to write that.
I've been thinking that pipes-parse might be the right library to be looking at. draw
looks to have a promising signature... but so far all the bits aren't fitting together in my head. I have a producer
, I'm writing a consumer
and I don't really get how that relates to the concept of a parser
.
Even more than pipes-parse
you quite possibly want to take a look at pipes-group
. In particular, let's examine the function
-- this type is slightly specialized
chunksOf
:: Monad m =>
Int ->
Lens' (Producer a m x) (FreeT (Producer a m) m x)
The Lens'
bit is perhaps scary but can be eliminated quickly: it states that we can convert Producer a m x
to FreeT (Producer a m) m x
[0]
import Control.Lens (view)
chunkIt :: Monad m => Int -> Producer a m x -> FreeT (Producer a m) m x
chunkIt n = view (chunksOf n)
So now we have to figure out what to do with that FreeT
bit. In particular, we're going to want to dig into the free
package and pull out the function iterT
iterT
:: (Functor f, Monad m) =>
(f (m a) -> m a) ->
(FreeT f m a -> m a)
This function, iterT
, let's us "consume" a FreeT
one "step" at a time. To understand this, we'll first specialize the type of iterT
to replace f
with Producer a m
runChunk :: Monad m =>
(Producer a m (m x) -> m x) ->
(FreeT (Producer a m) m x -> m x)
runChunk = iterT
In particular, runChunk
can "run" a FreeT
full of Producer
s so long as we tell it how to convert a Producer a m (m x)
into an m
-action. This may start to look more familiar. When we define the first argument of runChunk
we simply have to execute a Producer
which, in this case, will have no more than the selected number of elements.
But what is up with the effectful return value m x
? It's the "continuation", e.g. all of the chunks which come after the current one you're writing. So, for instance, let's assume we've got a Producer
of Char
s and we'd like to print and linebreak after 3 characters
main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> _
The _
hole at this point has type IO ()
with p
in the context at type p :: Producer Char IO (IO ())
. We can consume this pipe with for
, gather its return type (which is the continuation, again), emit a newline, and then run the continuation.
input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"
main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> do
cont <- runEffect $ for p (lift . putChar)
putChar '\n'
cont
And this behaves exactly as desired
λ> main
abc
def
ghi
jkl
mno
pqr
stu
vwx
yz
To be clear, while I did a bit of exposition, this is fairly simple code once you see how all the pieces fit together. Here is the whole listing:
input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"
main :: IO ()
main = flip iterT (input ^. chunksOf 3) $ \p -> do
cont <- runEffect $ for p $ \c -> do
lift (putChar c)
putChar '\n'
cont
[0] Also a bit more than that, but it's enough for now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With