Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Haskell: Splitting pipes (broadcast) without using spawn

This question is a bit codegolf and a lot newb.

I'm using the awesome pipes library in Haskell, and I'd like to split a pipe to send the same data along multiple channels (do a broadcast). The Pipes.Concurrent tutorial suggests using spawn to create mailboxes, taking advantage of Output's monoid status. For example, we might do something like this:

main = do
 (output1, input1) <- spawn Unbounded
 (output2, input2) <- spawn Unbounded
 let effect1 = fromInput input1 >-> pipe1
 let effect2 = fromInput input2 >-> pipe2
 let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
 ...

Is this indirection through the mailboxes really necessary? Could we instead write something like this?

main = do
 let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
 ...

The above doesn't compile, because Pipe doesn't have a Monoid instance. Is there a good reason for this? Is the first method really the cleanest way to split a pipe?

like image 544
emchristiansen Avatar asked Feb 14 '23 19:02

emchristiansen


1 Answers

There are two ways to do this without using concurrency, both with caveats.

The first way is that if pipe1 and pipe2 are just simple Consumers that loop forever like:

p1 = for cat f  -- i.e. p1 = forever $ await >>= f
p2 = for cat g  -- i.e. p2 = forever $ await >>= g

... then the easy way to solve this is to just write:

for P.stdinLn $ \str -> do
    f str
    g str

For example, if p1 is just printing every value:

p1 = for cat (lift . print)

... and p2 is just writing that value to a handle:

p2 = for cat (lift . hPutStrLn h)

... then you would combine them like so:

for P.stdinLn $ \str -> do
    lift $ print str
    lift $ hPutStrLn h str

However, this simplification only works for Consumers that trivially loop. There's another solution that is more general, which is to define an ArrowChoice instance for pipes. I believe that pull-based Pipes do not permit a correct law-abiding instance, but push-based Pipes do:

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad m) => Category (Edge m r) where
    id = Edge push
    (Edge p2) . (Edge p1) = Edge (p1 >~> p2)

instance (Monad m) => Arrow (Edge m r) where
    arr f = Edge (push />/ respond . f)
    first (Edge p) = Edge $ \(b, d) ->
        evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
      where
        up () = do
            (b, d) <- request ()
            lift $ put d
            return b
        dn c = do
            d <- lift get
            respond (c, d)

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

This requires a newtype so that the type parameters are in the order that ArrowChoice expects.

If you're unfamiliar with the term push-based Pipe, it's basically a Pipe that begins from the most upstream pipe instead of the most downstream pipe, and they all have the following shape:

a -> Pipe a b m r

Think of it as a Pipe that cannot "go" until it receives at least one value from upstream.

These push-based Pipes are the "dual" to conventional pull-based Pipes, complete with their own composition operator and identity:

(>~>) :: (Monad m)
      => (a -> Pipe a b m r)
      -> (b -> Pipe b c m r)
      -> (a -> Pipe a c m r)

push  :: (Monad m)
      ->  a -> Pipe a a m r

... but the unidirectional Pipes API does not export this by default. You can only get these operators from Pipes.Core (and you may want to study that module more closely to build an intuition for how they work). That module shows that both push-based Pipes and pull-based Pipes are both special cases of more general bidirectional versions, and understanding the bidirectional case is how you learn why they are duals of each other.

Once you have an Arrow instance for push-based pipes, you can write something like:

p >>> bifurcate >>> (p1 +++ p2)
  where
    bifurcate = Edge $ pull ~> \a -> do
        yield (Left  a)  -- First give `p1` the value
        yield (Right a)  -- Then give `p2` the value

Then you would use runEdge to convert that to a pull-based pipe when you are done.

This approach has one major draw-back, which is that you can't automatically upgrade a pull-based pipe to a push-based pipe (but usually it's not hard to figure out how to do it manually). For example, to upgrade Pipes.Prelude.map to be a push-based Pipe, you would write:

mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
    yield (f a)
    Pipes.Prelude.map f

Then that has the right type to be wrapped up in the Arrow:

mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)

Of course, an even simpler way would be just to write it from scratch:

mapEdge f = Edge $ push ~> yield . f

Use whichever approach suits you best.

In fact, I came up with the Arrow and ArrowChoice instances precisely because I was trying to answer the exact same question as you: how do you solve these kinds of problems without using concurrency? I wrote up a long answer about this more general subject in another Stack Overflow answer here, where I describe how you can use these Arrow and ArrowChoice instances to distill concurrent systems into equivalent pure ones.

like image 184
Gabriella Gonzalez Avatar answered Feb 24 '23 14:02

Gabriella Gonzalez