Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fusing conduits with multiple inputs

I am trying to create a conduit that can consume multiple input streams. I need to be able to await on one or the other of the input streams in no particular order (e.g., not alternating) making zip useless. There is nothing parallel or non-deterministic going on here: I await on one stream or the other. I want to be able to write code similar to the following (where awaitA and awaitB await on the first or second input stream respectively):

do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

The best solution I have is to make the inner monad another conduit, e.g.

foo :: Sink i1 (ConduitM i2 o m) ()

Which then allows

awaitA = await
awaitB = lift await

And this mostly works. Unfortunately, this seems to make it very difficult to fuse to the inner conduit before the outer conduit is fully connected. The first thing I tried was:

fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

But this doesn't work, at least when x is stateful since (x =$=) is run multiple times, effectively restarting x each time.

Is there any way to write fuseInner, short of breaking into the internals of conduit (which looks like it would be pretty messy)? Is there some better way to handle multiple input streams? Am I just way to far beyond what conduit was designed for?

Thanks!

like image 594
Benson Avatar asked Mar 24 '13 02:03

Benson


1 Answers

If you want to combine two IO-generated streams, then Gabriel's comment is the solution.

Otherwise, you can't wait for both streams, which one produces a value first. Conduits are single-threaded and deterministic - it processes only one pipe at a time. But you could create a function that interleaves two streams, letting them decide when to switch:

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
    Pipe (..), Source, Sink,
    injectLeftovers, ConduitM (..),
    mapOutput, mapOutputMaybe
  )

-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
      => Source m (Maybe a)
      -> Source m (Maybe b)
      -> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
  where
    goL :: Monad m => Pipe () () (Maybe a) () m () 
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goL (Leftover l ()) r           = goL l r
    goL (NeedInput _ c) r           = goL (c ()) r
    goL (PipeM mx) r                = PipeM $ liftM (`goL` r) mx
    goL (Done _) r                  = mapOutputMaybe (liftM Right) r
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
    goL (HaveOutput c f Nothing) r  = goR c r
    -- This is just a mirror copy of goL. We should combine them together to
    -- avoid code repetition.
    goR :: Monad m => Pipe () () (Maybe a) () m ()
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goR l (Leftover r ())           = goR l r
    goR l (NeedInput _ c)           = goR l (c ())
    goR l (PipeM mx)                = PipeM $ liftM (goR l) mx
    goR l (Done _)                  = mapOutputMaybe (liftM Left) l
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
    goR l (HaveOutput c f Nothing)  = goL l c

It processes one source until it returns Nothing, then switches to another, etc. If one source finishes, the other one is processed to the end.

As an example, we can combine and interleave two lists:

import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)

main =  (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [  1..10])
               (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
         $$ awaitForever (\x -> lift $ print x)

If you need multiple sources, merge could be adapted to something like

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a

which would cycle through the given list of sources until all of them are finished.

like image 103
Petr Avatar answered Oct 01 '22 02:10

Petr