Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Signalling to downstream that upstream is exhausted

Question

Using the Haskell pipes library, I'm trying to define a Pipe with the following type:

signalExhausted :: Monad m => Pipe a (Value a) m r 

where the Value data type is defined by:

data Value a = Value a | Exhausted

The pipe should obey the following laws:

toList (each [] >-> signalExhausted) ==                 [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]

In other words, the pipe should be equivalent to Pipes.Prelude.map Value, except that it should yield an additional Exhausted after all upstream values have been processed, giving downstream a chance to perform some final action.

Can such a Pipe be defined?

Example

> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted]

Notes

I'm aware that the pipes-parse library provides the functions draw and parseForever. These look useful, but I can't quite see how to combine them into a Pipe that matches the specification above.

like image 339
jsk Avatar asked Aug 20 '15 08:08

jsk


1 Answers

A pipe like signalExhausted can't be defined, but a function equivalent to (>-> signalExhausted) can.

>-> is a specialized version of the pull category. Execution is driven by the downstream proxies pulling data from upstream proxies. The downstream proxy sends an empty request () upstream and blocks until a response holding a value comes back from the upstream proxy. When the upstream proxy is exhausted and doesn't have any more values to send back, it returns. You can see the return that matters for these examples in the definition of each.

each = F.foldr (\a p -> yield a >> p) (return ())
-- what to do when the data's exhausted ^                       

The downstream proxy needs a value to continue running, but there's no value the pipes library can possibly provide it, so the downstream proxy never runs again. Since it never runs again, there's no way it can modify or react to the data.

There are two solutions to this problem. The simplest is to map Value over the upstream pipe and add a yield Exhausted after it's done.

import Pipes
import qualified Pipes.Prelude as P

data Value a = Value a | Exhausted
    deriving (Show)

signalExhausted p = p >-> P.map Value >> yield Exhausted

This does exactly what you're looking for except the function signalExhausted takes the place of (>-> signalExhausted).

let xs = words "hubble bubble toil and trouble"
print . P.toList . signalExhausted $ each xs

[Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]

The more general solution to this problem is to stop the upstream proxy from returning and instead signal downstream when it is exhausted. I demonstrated how to do so in an answer to a related question.

import Control.Monad
import Pipes.Core

returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)

This replaces each respond with respond . Right and replaces return with forever . respond . left, sending returns downstream along with responses.

returnDownstream is more general than what you are looking for. We can demonstrate how to use it to recreate signalExhausted. returnDownstream transforms a pipe that returns into one that never returns, and instead forwards its return value downstream as the Left value of an Either.

signalExhausted p = returnDownstream p >-> respondLeftOnce

respondLeftOnce is an example downstream proxy. The downstream proxy can discern between regular values held in Right and the return value held in Left.

respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
respondLeftOnce = go
    where
        go = do
            ea <- await
            case ea of
                Right a -> yield (Value a) >> go                    
                Left  _ -> yield Exhausted       -- The upstream proxy is exhausted; do something else  
like image 199
Cirdec Avatar answered Oct 16 '22 04:10

Cirdec