A view days ago, I asked this question. Now I need a pure single threaded version of this function:
To repeat, I need a function that sends each received value to each sink and collects their results. The type signature of the function should be something like this:
broadcast :: [Sink a m b] -> Sink a m [b]
Best Sven
P.S. It is not sequence
, I've tried that:
> C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0]
[5050, 0]
expected result:
[5050, 5050]
P.P.S. zipSinks
gives the desired result, but it works just with tuples:
> C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0)
(5050, 5050)
Basically all we need to do is to reimplement sequence
, but with zipSinks
instead of the original sequencing operation:
import Data.Conduit as C
import Data.Conduit.List as C
import Data.Conduit.Util as C
fromPairs
:: (Functor f)
=> f [a] -- ^ an empty list to start with
-> (f a -> f [a] -> f (a, [a])) -- ^ a combining function
-> [f a] -- ^ input list
-> f [a] -- ^ combined list
fromPairs empty comb = g
where
g [] = empty
g (x:xs) = uncurry (:) `fmap` (x `comb` g xs)
Now creating broadcast
is just applying fromPairs
to zipSinks
:
broadcast :: (Monad m) => [Sink a m b] -> Sink a m [b]
broadcast = fromPairs (return []) zipSinks
And we can do something like
main = C.sourceList [1..100] $$ broadcast [C.fold (+) 0, C.fold (*) 1]
Update: We can see that fromPairs
looks just sequenceA
and so we can push the idea even further. Let's define a zipping applicative functor on conduits similar to ZipList
:
import Control.Applicative
import Control.Monad
import Data.Conduit
import Data.Conduit.Util
import Data.Traversable (Traversable(..), sequenceA)
newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
instance Monad m => Functor (ZipSink i m) where
fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
pure = ZipSink . return
(ZipSink f) <*> (ZipSink x) =
ZipSink $ liftM (uncurry ($)) $ zipSinks f x
Then broadcast
becomes as simple as
broadcast :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
broadcast = getZipSink . sequenceA . fmap ZipSink
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With