I am having a look at the pipes 3.0 package for stream processing. The tutorial is very well done and very clear, except that I cannot wrap my head around the "zip and merge" section.
My goal is to combine pipes a bit like ArrowChoice allows to do:
+----------+ +------+ - filterLeft -> pipe1 -> +------------+
| producer | - (Either a a) -> | fork | | mergeD (?) |
+----------+ +------+ - filterRight -> pipe2 -> +------------+
I define fork
like in the tutorial:
fork () =
runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do
a <- request ()
lift $ respond a
lift $ lift $ respond a
oddOrEven x = if odd x then Left x else Right x
producer = fromListS [1..0] >-> mapD oddOrEven
isLeft (Left _) = True
isLeft (Right _) = False
isRight = not . isLeft
filterLeft = filterD isLeft
filterRight = filterD isRight
pipe1 = mapD (\x -> ("seen on left", x))
pipe2 = mapD (\x -> ("seen on right", x))
p1 = producer >-> fork
The problem is that I cannot make the types right. The tutorial seems only to show how to run the inner (lifted) pipe chain as a self contained session, but I would like to be able to reinject its values to the pipe, not just apply an effect on them. I of course tried to follow the types, but they get a bit hairy very quickly.
Does Anyone can help me on this ? Thanks in advance.
(PS: an example of this kind of topology would be a nice addition to the tutorial, or even better a section on how to emulate the Control.Arrow
stuff using pipes)
The pipe
abstraction does not support diamond topologies or any form of Arrow
-like behavior. This is not an API issue, but rather there is no correct or well-defined behavior for such a scenario.
To explain why, allow me to simplify your diagram to the following one:
+----+
| pL |
+----+ => +----+ => +----+
| p1 | | p2 |
+----+ => +----+ => +----+
| pR |
+----+
Imagine we are at the p1
pipe and we respond
to pL
. If you remember the tutorial, the proxy laws require that every respond
blocks until upstream. That means that p1
cannot regain control until pL
request
s again. So at this point we have:
p1
blocked waiting for a request
from pL
However, suppose that pL
does not request
yet and instead respond
s with a value of its own to p2
. So now we have:
p1
blocked waiting for a request
from pL
pL
blocked waiting for a request
from p2
Now suppose that p2
instead request
s from pR
. The proxy laws say that p2
cannot regain control until pR
respond
s again. Now we have:
p1
blocked waiting for a request
from pL
pL
blocked waiting for a request
from p2
p2
blocked waiting for a respond
from pR
Now what happens when pR
request
s a value from p1
? If we consult our list of blocks, p1
is still blocked waiting for a request
from pL
, so it is in no shape to receive a request
from pR
. There is no correct way to "tie the knot", so to speak, even if pL
and pR
shared the same request
signature.
More generally, the proxy laws ensure the following two invariants:
respond
request
Cycles or diamonds break these invariants. This is why the tutorial very briefly remarks in passing that cyclic topologies do not "make sense".
You can see why diamonds break this invariant in the example I just gave you. When p1
had control it was upstream of pR
, which would imply pR
was blocked on a request
. However, when p2
gained control it was downstream of pR
, which would imply pR
was blocked on a respond
. This leads to a contradiction, because pR
couldn't have changed yet since control flowed through pL
and not pR
to get to p2
.
So there are two solutions to your problem. one solution is to just inline your desired splitting behavior into a single pipe. You define a pE
pipe that combines the behavior of pL
and pR
into a single pipe.
The more elegant solution to this problem is something in the style of Edward's machines
. You define a more restricted abstraction that is less powerful than proxies that supports ArrowChoice
, you do your arrow-ish stuff within the domain of that abstraction, and then when you are done you upgrade it to proxies.
If you squint, you could pretend that there is a category of currently available coroutine abstractions in Haskell that is a partial order. Coroutines abstractions are the objects, and an arrow from coroutine abstraction C1
to coroutine abstraction C2
means that you can embed coroutines of type C1
in coroutines of type C2
(i.e. C1
is an improper subset of C2
).
In this partial order, proxies would probably be the terminal object, meaning that you can think of proxies as the assembly language of coroutines. Following the analogy of assembly language, proxies provide less guarantees, but you can embed more restrictive coroutine abstractions (i.e. higher-level languages) within proxies. These higher-level languages provide greater restrictions which enables more powerful abstractions (i.e. an Arrow
instance).
If you want a trivial example of this, consider one of the simplest coroutine abstractions: the Kleisli arrow:
newtype Kleisli m a b = Kleisli { runKleisli :: a -> m b }
instance Category (Kleisli m) where
id = Kleisli return
(Kleisli f) . (Kleisli g) = Kleisli (f <=< g)
Kleisli arrows are definitely more restrictive than proxies, but because of this restriction they support an Arrow
instance. So whenever you need an Arrow
instance you write your code using Kleisli arrows, and combine it using Arrow
notation, and then when you are done, you can "compile" that higher-level Kleisli code to the proxy assembly code using mapMD
:
kleisliToProxy :: (Proxy p) => Kleisli m a b -> () -> Pipe p a b m r
kleisliToProxy (Kleisli f) = mapMD f
This compilation obeys the functor laws:
kleisliToProxy id = idT
kleisliToProxy (f . g) = kleisliToProxy f <-< kleisliToProxy g
So if your branching code can be written in terms of Kleisli
arrows, then use Kleisli
arrows for that section of the code and then compile it down to proxies when you are done. Using this trick, you can compile multiple coroutine abstractions down to the proxy abstraction to mix them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With