Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to detect end of input with pipes

I'm trying to read a group of up to 50 items from a pipe and process them in an IO action all at once. (The use case for this is I'm trying to insert data into a database and I want to do an entire batch inside one transaction because it is vastly more efficient). Here is a simplified version of what I've got so far:

type ExampleType = Int

doSomething :: [ExampleType] -> IO ()
doSomething = undefined

inGroupsOf50 :: Monad m => Producer ExampleType m () -> m ()
inGroupsOf50 input =
    runEffect $ input >-> loop
        where loop = do entries <- replicateM 50 await
                    lift $ doSomething entries  --Insert a bunch all in one transaction
                    loop

The problem is as far as I can tell, unless the number of items to insert happens to divide by 50, I'm going to miss some. What I really want instead of replicateM 50 await is something that gives me up to 50 items or fewer if the input ends but I can't quite figure out how to write that.

I've been thinking that pipes-parse might be the right library to be looking at. draw looks to have a promising signature... but so far all the bits aren't fitting together in my head. I have a producer, I'm writing a consumer and I don't really get how that relates to the concept of a parser.

like image 791
Gareth Charnock Avatar asked Dec 29 '14 16:12

Gareth Charnock


1 Answers

Even more than pipes-parse you quite possibly want to take a look at pipes-group. In particular, let's examine the function

-- this type is slightly specialized
chunksOf 
  :: Monad m => 
     Int -> 
     Lens' (Producer a m x) (FreeT (Producer a m) m x)

The Lens' bit is perhaps scary but can be eliminated quickly: it states that we can convert Producer a m x to FreeT (Producer a m) m x[0]

import Control.Lens (view)

chunkIt :: Monad m => Int -> Producer a m x -> FreeT (Producer a m) m x
chunkIt n = view (chunksOf n)

So now we have to figure out what to do with that FreeT bit. In particular, we're going to want to dig into the free package and pull out the function iterT

iterT
  :: (Functor f, Monad m) => 
     (f (m a) -> m a) -> 
     (FreeT f m a -> m a)

This function, iterT, let's us "consume" a FreeT one "step" at a time. To understand this, we'll first specialize the type of iterT to replace f with Producer a m

runChunk :: Monad m =>
            (Producer a m (m x)       -> m x) ->
            (FreeT (Producer a m) m x -> m x)
runChunk = iterT

In particular, runChunk can "run" a FreeT full of Producers so long as we tell it how to convert a Producer a m (m x) into an m-action. This may start to look more familiar. When we define the first argument of runChunk we simply have to execute a Producer which, in this case, will have no more than the selected number of elements.

But what is up with the effectful return value m x? It's the "continuation", e.g. all of the chunks which come after the current one you're writing. So, for instance, let's assume we've got a Producer of Chars and we'd like to print and linebreak after 3 characters

main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> _

The _ hole at this point has type IO () with p in the context at type p :: Producer Char IO (IO ()). We can consume this pipe with for, gather its return type (which is the continuation, again), emit a newline, and then run the continuation.

input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"

main :: IO ()
main = flip runChunk (chunkIt 3 input) $ \p -> do
  cont <- runEffect $ for p (lift . putChar)
  putChar '\n'
  cont

And this behaves exactly as desired

λ> main
abc
def
ghi
jkl
mno
pqr
stu
vwx
yz

To be clear, while I did a bit of exposition, this is fairly simple code once you see how all the pieces fit together. Here is the whole listing:

input :: Monad m => Producer Char m ()
input = each "abcdefghijklmnopqrstuvwxyz"

main :: IO ()
main = flip iterT (input ^. chunksOf 3) $ \p -> do
  cont <- runEffect $ for p $ \c -> do
    lift (putChar c)
  putChar '\n'
  cont

[0] Also a bit more than that, but it's enough for now.

like image 156
J. Abrahamson Avatar answered Oct 12 '22 21:10

J. Abrahamson