Using the Haskell pipes library, I'm trying to define a Pipe
with the following type:
signalExhausted :: Monad m => Pipe a (Value a) m r
where the Value
data type is defined by:
data Value a = Value a | Exhausted
The pipe should obey the following laws:
toList (each [] >-> signalExhausted) == [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]
In other words, the pipe should be equivalent to Pipes.Prelude.map Value
, except that it should yield an additional Exhausted
after all upstream values have been processed, giving downstream a chance to perform some final action.
Can such a Pipe
be defined?
> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted]
I'm aware that the pipes-parse
library provides the functions draw
and parseForever
. These look useful, but I can't quite see how to combine them into a Pipe
that matches the specification above.
A pipe like signalExhausted
can't be defined, but a function equivalent to (>-> signalExhausted)
can.
>->
is a specialized version of the pull
category. Execution is driven by the downstream proxies pulling data from upstream proxies. The downstream proxy sends an empty request ()
upstream and blocks until a response holding a value comes back from the upstream proxy. When the upstream proxy is exhausted and doesn't have any more values to send back, it return
s. You can see the return
that matters for these examples in the definition of each
.
each = F.foldr (\a p -> yield a >> p) (return ())
-- what to do when the data's exhausted ^
The downstream proxy needs a value to continue running, but there's no value the pipes library can possibly provide it, so the downstream proxy never runs again. Since it never runs again, there's no way it can modify or react to the data.
There are two solutions to this problem. The simplest is to map
Value
over the upstream pipe and add a yield Exhausted
after it's done.
import Pipes
import qualified Pipes.Prelude as P
data Value a = Value a | Exhausted
deriving (Show)
signalExhausted p = p >-> P.map Value >> yield Exhausted
This does exactly what you're looking for except the function signalExhausted
takes the place of (>-> signalExhausted)
.
let xs = words "hubble bubble toil and trouble"
print . P.toList . signalExhausted $ each xs
[Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]
The more general solution to this problem is to stop the upstream proxy from returning and instead signal downstream when it is exhausted. I demonstrated how to do so in an answer to a related question.
import Control.Monad
import Pipes.Core
returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)
This replaces each respond
with respond . Right
and replaces return
with forever . respond . left
, sending returns downstream along with responses.
returnDownstream
is more general than what you are looking for. We can demonstrate how to use it to recreate signalExhausted
. returnDownstream
transforms a pipe that returns into one that never returns, and instead forwards its return value downstream as the Left
value of an Either
.
signalExhausted p = returnDownstream p >-> respondLeftOnce
respondLeftOnce
is an example downstream proxy. The downstream proxy can discern between regular values held in Right
and the return value held in Left
.
respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
respondLeftOnce = go
where
go = do
ea <- await
case ea of
Right a -> yield (Value a) >> go
Left _ -> yield Exhausted -- The upstream proxy is exhausted; do something else
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With