This question is a bit codegolf and a lot newb.
I'm using the awesome pipes
library in Haskell, and I'd like to split a pipe to send the same data along multiple channels (do a broadcast).
The Pipes.Concurrent
tutorial suggests using spawn
to create mailboxes, taking advantage of Output
's monoid status.
For example, we might do something like this:
main = do
(output1, input1) <- spawn Unbounded
(output2, input2) <- spawn Unbounded
let effect1 = fromInput input1 >-> pipe1
let effect2 = fromInput input2 >-> pipe2
let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
...
Is this indirection through the mailboxes really necessary? Could we instead write something like this?
main = do
let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
...
The above doesn't compile, because Pipe
doesn't have a Monoid
instance.
Is there a good reason for this?
Is the first method really the cleanest way to split a pipe?
There are two ways to do this without using concurrency, both with caveats.
The first way is that if pipe1
and pipe2
are just simple Consumer
s that loop forever like:
p1 = for cat f -- i.e. p1 = forever $ await >>= f
p2 = for cat g -- i.e. p2 = forever $ await >>= g
... then the easy way to solve this is to just write:
for P.stdinLn $ \str -> do
f str
g str
For example, if p1
is just print
ing every value:
p1 = for cat (lift . print)
... and p2
is just writing that value to a handle:
p2 = for cat (lift . hPutStrLn h)
... then you would combine them like so:
for P.stdinLn $ \str -> do
lift $ print str
lift $ hPutStrLn h str
However, this simplification only works for Consumer
s that trivially loop. There's another solution that is more general, which is to define an ArrowChoice
instance for pipes. I believe that pull-based Pipe
s do not permit a correct law-abiding instance, but push-based Pipe
s do:
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
instance (Monad m) => Category (Edge m r) where
id = Edge push
(Edge p2) . (Edge p1) = Edge (p1 >~> p2)
instance (Monad m) => Arrow (Edge m r) where
arr f = Edge (push />/ respond . f)
first (Edge p) = Edge $ \(b, d) ->
evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
where
up () = do
(b, d) <- request ()
lift $ put d
return b
dn c = do
d <- lift get
respond (c, d)
instance (Monad m) => ArrowChoice (Edge m r) where
left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
where
bef x = case x of
Left b -> return b
Right d -> do
_ <- respond (Right d)
x2 <- request ()
bef x2
up () = do
x <- request ()
bef x
dn c = respond (Left c)
This requires a newtype so that the type parameters are in the order that ArrowChoice
expects.
If you're unfamiliar with the term push-based Pipe
, it's basically a Pipe
that begins from the most upstream pipe instead of the most downstream pipe, and they all have the following shape:
a -> Pipe a b m r
Think of it as a Pipe
that cannot "go" until it receives at least one value from upstream.
These push-based Pipe
s are the "dual" to conventional pull-based Pipe
s, complete with their own composition operator and identity:
(>~>) :: (Monad m)
=> (a -> Pipe a b m r)
-> (b -> Pipe b c m r)
-> (a -> Pipe a c m r)
push :: (Monad m)
-> a -> Pipe a a m r
... but the unidirectional Pipes
API does not export this by default. You can only get these operators from Pipes.Core
(and you may want to study that module more closely to build an intuition for how they work). That module shows that both push-based Pipe
s and pull-based Pipe
s are both special cases of more general bidirectional versions, and understanding the bidirectional case is how you learn why they are duals of each other.
Once you have an Arrow
instance for push-based pipes, you can write something like:
p >>> bifurcate >>> (p1 +++ p2)
where
bifurcate = Edge $ pull ~> \a -> do
yield (Left a) -- First give `p1` the value
yield (Right a) -- Then give `p2` the value
Then you would use runEdge
to convert that to a pull-based pipe when you are done.
This approach has one major draw-back, which is that you can't automatically upgrade a pull-based pipe to a push-based pipe (but usually it's not hard to figure out how to do it manually). For example, to upgrade Pipes.Prelude.map
to be a push-based Pipe
, you would write:
mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
yield (f a)
Pipes.Prelude.map f
Then that has the right type to be wrapped up in the Arrow
:
mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)
Of course, an even simpler way would be just to write it from scratch:
mapEdge f = Edge $ push ~> yield . f
Use whichever approach suits you best.
In fact, I came up with the Arrow
and ArrowChoice
instances precisely because I was trying to answer the exact same question as you: how do you solve these kinds of problems without using concurrency? I wrote up a long answer about this more general subject in another Stack Overflow answer here, where I describe how you can use these Arrow
and ArrowChoice
instances to distill concurrent systems into equivalent pure ones.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With