Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conduit Broadcast

A view days ago, I asked this question. Now I need a pure single threaded version of this function:

To repeat, I need a function that sends each received value to each sink and collects their results. The type signature of the function should be something like this:

broadcast :: [Sink a m b] -> Sink a m [b]

Best Sven


P.S. It is not sequence, I've tried that:

> C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0]
[5050, 0]

expected result:

[5050, 5050]

P.P.S. zipSinks gives the desired result, but it works just with tuples:

> C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0)
(5050, 5050)
like image 389
SvenK Avatar asked Aug 15 '13 11:08

SvenK


Video Answer


1 Answers

Basically all we need to do is to reimplement sequence, but with zipSinks instead of the original sequencing operation:

import Data.Conduit as C
import Data.Conduit.List as C
import Data.Conduit.Util as C

fromPairs
    :: (Functor f)
    => f [a]                        -- ^ an empty list to start with
    -> (f a -> f [a] -> f (a, [a])) -- ^ a combining function
    -> [f a]                        -- ^ input list
    -> f [a]                        -- ^ combined list
fromPairs empty comb = g
  where
    g []     = empty
    g (x:xs) = uncurry (:) `fmap` (x `comb` g xs)

Now creating broadcast is just applying fromPairs to zipSinks:

broadcast :: (Monad m) => [Sink a m b] -> Sink a m [b]
broadcast = fromPairs (return []) zipSinks

And we can do something like

main = C.sourceList [1..100] $$ broadcast [C.fold (+) 0, C.fold (*) 1]

Update: We can see that fromPairs looks just sequenceA and so we can push the idea even further. Let's define a zipping applicative functor on conduits similar to ZipList:

import Control.Applicative
import Control.Monad
import Data.Conduit
import Data.Conduit.Util
import Data.Traversable (Traversable(..), sequenceA)

newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }

instance Monad m => Functor (ZipSink i m) where
    fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
    pure  = ZipSink . return
    (ZipSink f) <*> (ZipSink x) =
         ZipSink $ liftM (uncurry ($)) $ zipSinks f x

Then broadcast becomes as simple as

broadcast :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
broadcast = getZipSink . sequenceA . fmap ZipSink
like image 55
Petr Avatar answered Nov 09 '22 09:11

Petr