I want the same data to be split in two "branches" to be processed separately, then "joined"...
+----------+
+---------+ -->| doublber |--- +--------+
+--------+ | |-- +----------+ -->| | +------+
| source |-->| splitter| | summer |-->| sink |
+--------+ | |-- +----------+ -->| | +------+
+---------+ -->| delayer |--- +--------+
+----------+
How should I do this?
My attempt:
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
-- import Data.Conduit.Internal (zipSources)
import Control.Arrow ((>>>))
source :: Source IO Int
source = do
x <- liftIO $ getLine
yield (read x)
source
splitter :: Conduit Int IO (Int, Int)
splitter = CL.map $ \x -> (x,x)
doubler = CL.map (* 2)
delayer :: Conduit Int IO Int
delayer = do
yield 0
CL.map id
twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = awaitForever $ \(x, y) -> do
out1 <- undefined q x
out2 <- undefined w y
yield (out1, out2)
summer :: Conduit (Int,Int) IO Int
summer = CL.map $ \(x,y) -> x + y
sink :: Sink Int IO ()
sink = CL.mapM_ (show >>> putStrLn)
-- combosrc = zipSources (source $= delayer) (source $= doubler)
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink
What shall I write in place of the undefined
s?
You can do this, but it's ugly, and hopefully the implementation will make it clear why it's ugly and not a built-in feature of conduit:
twoConduitBranches :: Monad m => Conduit a m c -> Conduit b m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = getZipConduit
(ZipConduit (CL.map fst =$= q =$= CL.map Left)
<* ZipConduit (CL.map snd =$= w =$= CL.map Right)) =$= collapse
where
collapse = do
v1 <- await
case v1 of
Nothing -> return ()
Just (Left _) -> error "out of sequence 1"
Just (Right d) -> do
v2 <- await
case v2 of
Nothing -> error "mismatched count"
Just (Right _) -> error "out of sequence 2"
Just (Left c) -> do
yield (c, d)
collapse
(Note: I tweaked your type signature a bit, I assume this is the type signature you really wanted.)
Here's the approach: turn q
into a Conduit
that takes the first value from each incoming tuple, and then wrap its output with Left
. Similarly, we take the second value from each incoming tuple and pass it to w
, and then wrap the output with Right
.
Now that these Conduit
s have the same type (they take in the same input tuples, and generate the same Either values), we combine them using ZipConduit
, which shares input amongst all components and coalesces the output into a single stream.
This stream is a stream of Either c d
, not the desired (c, d)
. To make this final conversion, we use collapse
. It pops off a Right
and Left
value, and then puts them together into a single tuple which it yields.
This function assumes that the sequence of output values will always be one value from w
, and then one from q
. If anything else happens, it will throw an exception. The problem is: there's nothing in conduit to imply that they will in fact generate output at the same rate. In fact, conduit is specifically designed to avoid that assumption!
So, if you know that your two component conduits will always produce output at the same rate, this function will work. But this won't be true generally speaking.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With