Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I make a "branched" Conduit?

I want the same data to be split in two "branches" to be processed separately, then "joined"...

                                +----------+
                +---------+  -->| doublber |---   +--------+
   +--------+   |         |--   +----------+   -->|        |   +------+
   | source |-->| splitter|                       | summer |-->| sink |
   +--------+   |         |--   +----------+   -->|        |   +------+
                +---------+  -->| delayer  |---   +--------+
                                +----------+

How should I do this?

My attempt:

import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
-- import Data.Conduit.Internal (zipSources)
import Control.Arrow ((>>>))

source :: Source IO Int
source = do
    x <- liftIO $ getLine
    yield (read x)
    source

splitter :: Conduit Int IO (Int, Int)
splitter = CL.map $ \x -> (x,x)

doubler = CL.map (* 2)

delayer :: Conduit Int IO Int
delayer = do
    yield 0
    CL.map id

twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = awaitForever $ \(x, y) -> do
    out1 <- undefined q x
    out2 <- undefined w y
    yield (out1, out2)


summer :: Conduit (Int,Int) IO Int
summer = CL.map $ \(x,y) -> x + y

sink :: Sink Int IO ()
sink = CL.mapM_ (show >>> putStrLn) 

-- combosrc = zipSources (source $= delayer) (source $= doubler)
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink

What shall I write in place of the undefineds?

like image 534
Vi. Avatar asked Jul 09 '14 18:07

Vi.


1 Answers

You can do this, but it's ugly, and hopefully the implementation will make it clear why it's ugly and not a built-in feature of conduit:

twoConduitBranches :: Monad m => Conduit a m c -> Conduit b m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = getZipConduit
      (ZipConduit (CL.map fst =$= q =$= CL.map Left)
    <* ZipConduit (CL.map snd =$= w =$= CL.map Right)) =$= collapse
  where
    collapse = do
        v1 <- await
        case v1 of
            Nothing -> return ()
            Just (Left _) -> error "out of sequence 1"
            Just (Right d) -> do
                v2 <- await
                case v2 of
                    Nothing -> error "mismatched count"
                    Just (Right _) -> error "out of sequence 2"
                    Just (Left c) -> do
                        yield (c, d)
                        collapse

(Note: I tweaked your type signature a bit, I assume this is the type signature you really wanted.)

Here's the approach: turn q into a Conduit that takes the first value from each incoming tuple, and then wrap its output with Left. Similarly, we take the second value from each incoming tuple and pass it to w, and then wrap the output with Right.

Now that these Conduits have the same type (they take in the same input tuples, and generate the same Either values), we combine them using ZipConduit, which shares input amongst all components and coalesces the output into a single stream.

This stream is a stream of Either c d, not the desired (c, d). To make this final conversion, we use collapse. It pops off a Right and Left value, and then puts them together into a single tuple which it yields.

This function assumes that the sequence of output values will always be one value from w, and then one from q. If anything else happens, it will throw an exception. The problem is: there's nothing in conduit to imply that they will in fact generate output at the same rate. In fact, conduit is specifically designed to avoid that assumption!

So, if you know that your two component conduits will always produce output at the same rate, this function will work. But this won't be true generally speaking.

like image 180
Michael Snoyman Avatar answered Nov 10 '22 01:11

Michael Snoyman