Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I write a pipe that sends downstream a list of what it receives from upstream?

I'm having a hard time to write a pipe with this signature:

toOneBigList :: (Monad m, Proxy p) => () -> Pipe p a [a] m r

It should simply take all as from upstream and send them in a list downstream.

All my attempts look fundamentally broken.

Can anybody point me in the right direction?

like image 429
Giacomo Tesio Avatar asked Dec 20 '22 05:12

Giacomo Tesio


2 Answers

There are two pipes-based solutions and I'll let you pick which one you prefer.

Note: It's not clear why you output the list on the downstream interface instead of just returning it directly as a result.

Conduit-style

The first one, which is very close to the conduit-based solution uses the upcoming pipes-pase, which is basically complete and just needs documentation. You can find the latest draft on Github.

Using pipes-parse, the solution is identical to the conduit solution that Petr gave:

import Control.Proxy
import Control.Proxy.Parse

combine
    :: (Monad m, Proxy p)
    => () -> Pipe (StateP [Maybe a] p) (Maybe a) [a] m ()
combine () = loop []
  where
    loop as = do
        ma <- draw
        case ma of
            Nothing -> respond (reverse as)
            Just a  -> loop (a:as)

draw is like conduit's await: it requests a value from either the leftovers buffer (that's the StateP part) or from upstream if the buffer is empty. Nothing indicates end of file.

You can wrap a pipe that does not have an end of file signal using the wrap function from pipes-parse, which has type:

wrap :: (Monad m, Proxy p) => p a' a b' b m r -> p a' a b' (Maybe b) m s

Classic Pipes Style

The second alternative is a bit simpler. If you want to fold a given pipe you can do so directly using WriterP:

import Control.Proxy
import Control.Proxy.Trans.Writer

foldIt
  :: (Monad m, Proxy p) =>
     (() -> Pipe p a b m ()) -> () -> Pipe p a [b] m ()
foldIt p () = runIdentityP $ do
    r <- execWriterK (liftP . p >-> toListD >-> unitU) ()
    respond r

That's a higher-level description of what is going on, but it requires passing in the pipe as an explicit argument. It's up to you which one you prefer.

By the way, this is why I was asking why you want to send a single value downstream. The above is much simpler if you return the folded list:

foldIt p = execWriterK (liftP . p >-> toListD)

The liftP might not even be necessary if p is completely polymorphic in its proxy type. I only include it as a precaution.

Bonus Solution

The reason pipes-parse does not provide the toOneBigList is that it's always a pipes anti-pattern to group the results into a list. pipes has several nice features that make it possible to never have to group the input into a list, even if you are trying to yield multiple lists. For example, using respond composition you can have a proxy yield the subset of the stream it would have traversed and then inject a handler that uses that subset:

example :: (Monad m, Proxy p) => () -> Pipe p a (() -> Pipe p a a m ()) m r
example () = runIdentityP $ forever $ do
    respond $ \() -> runIdentityP $ replicateM_ 3 $ request () >>= respond

printIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
printIt () = runIdentityP $ do
    lift $ putStrLn "Here we go!"
    printD ()

useIt :: (Proxy p, Show a) => () -> Pipe p a a IO r
useIt = example />/ (\p -> (p >-> printIt) ())

Here's an example of how to use it:

>>> runProxy $ enumFromToS 1 10 >-> useIt
Here we go!
1
2
3
Here we go!
4
5
6
Here we go!
7
8
9
Here we go!
10

This means you never need to bring a single element into memory even when you need to group elements.

like image 61
Gabriella Gonzalez Avatar answered Mar 09 '23 01:03

Gabriella Gonzalez


I'll give only a partial answer, perhaps somebody else will have a better one.

As far as I know, standard pipes have no mechanism of detecting when the other part of the pipeline terminates. The first pipe that terminates produces the final result of the pipe-line and all the others are just dropped. So if you have a pipe that consumes input forever (to eventually produce a list), it will have no chance acting and producing output when its upstream finishes. (This is intentional so that both up- and down-stream parts are dual to each other.) Perhaps this is solved in some library building on top of pipes.

The situation is different with conduit. It has consume function that combines all inputs into a list and returns (not outputs) it. Writing a function like the one you need, that outputs the list at the end, is not difficult:

import Data.Conduit

combine :: (Monad m) => Conduit a m [a]
combine = loop []
  where
    loop xs = await >>= maybe (yield $ reverse xs) (loop . (: xs))
like image 37
Petr Avatar answered Mar 09 '23 00:03

Petr