Is it possible to build a function (say zipC2) in Conduit that would turn the following sources:
series1 = yieldMany [2, 4, 6, 8, 16 :: Int]
series2 = yieldMany [1, 5, 6 :: Int]
into one that would produce the following pairs (shown here as list):
[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]
It would be called with a comparison function in the following way:
runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )
There used to be a mergeSources function in previous versions that did something relatively similar (without a memory effect though), but it disappeared in the most recent version (1.3.1).
Clarification about how the function works: The idea is to take 2 sources A (generating values a) and B (generating values b).
We then generate pairs:
If a < b we first build (Just a, Nothing)
If b < a it would yield (Nothing, Just b)
If a == b we update both sides and we produce (Just a, Just b)
The value from the source that was not updated is not consumed and is used for the next round of comparisons. Only updated values are consumed.
We then keep updating the pair, according to the values from A and B relative to each other.
In other words: we update the left side of the pair if a < b, the right side if b < a, or both sides if a == b. Any unconsumed value is kept in memory for the next round of comparison.
The code below works as expected (I called the function mergeSort):
module Data.Conduit.Merge where
import Prelude (Monad, Bool, Maybe(..), Show, Eq)
import Prelude (otherwise, return)
import Prelude (($))
import Conduit (ConduitT)
import Conduit (evalStateC, mapC, yield, await)
import Conduit ((.|))
import Control.Monad.State (get, put, lift)
import Control.Monad.Trans.State.Strict (StateT)
import qualified Data.Conduit.Internal as CI
-- | Takes two sources and merges them.
-- This comes from https://github.com/luispedro/conduit-algorithms made available thanks to Luis Pedro Coelho.
mergeC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 comparator (CI.ConduitT s1) (CI.ConduitT s2) = CI.ConduitT $ processMergeC2 comparator s1 s2
processMergeC2 :: Monad m => (a -> a -> Bool)
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s1 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s2 ConduitT () a m ()
-> ((() -> CI.Pipe () () a () m b ) -> CI.Pipe () () a () m b ) -- rest ConduitT () a m ()
processMergeC2 comparator s1 s2 rest = go (s1 CI.Done) (s2 CI.Done)
where
go s1''@(CI.HaveOutput s1' v1) s2''@(CI.HaveOutput s2' v2) -- s1''@ and s2''@ simply name the pattern expressions
| comparator v1 v2 = CI.HaveOutput (go s1' s2'') v1
| otherwise = CI.HaveOutput (go s1'' s2') v2
go s1'@CI.Done{} (CI.HaveOutput s v) = CI.HaveOutput (go s1' s) v
go (CI.HaveOutput s v) s1'@CI.Done{} = CI.HaveOutput (go s s1') v
go CI.Done{} CI.Done{} = rest ()
go (CI.PipeM p) left = do
next <- lift p
go next left
go right (CI.PipeM p) = do
next <- lift p
go right next
go (CI.NeedInput _ next) left = go (next ()) left
go right (CI.NeedInput _ next) = go right (next ())
go (CI.Leftover next ()) left = go next left
go right (CI.Leftover next ()) = go right next
data MergeTag = LeftItem | RightItem deriving (Show, Eq)
data TaggedItem a = TaggedItem MergeTag a deriving (Show, Eq)
mergeTag :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (TaggedItem a) m ()
mergeTag func series1 series2 = mergeC2 (tagSort func) taggedSeries1 taggedSeries2
where
taggedSeries1 = series1 .| mapC (\item -> TaggedItem LeftItem item)
taggedSeries2 = series2 .| mapC (\item -> TaggedItem RightItem item)
tagSort :: (a -> a -> Bool) -> TaggedItem a -> TaggedItem a -> Bool
tagSort f (TaggedItem _ item1) (TaggedItem _ item2) = f item1 item2
type StateMergePair a = (Maybe a, Maybe a)
pairTagC :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) (StateT (StateMergePair a) m) ()
pairTagC = do
input <- await
case input of
Nothing -> return ()
Just taggedItem -> do
stateMergePair <- lift get
let outputState = updateStateMergePair taggedItem stateMergePair
lift $ put outputState
yield outputState
pairTagC
updateStateMergePair :: TaggedItem a -> StateMergePair a -> StateMergePair a
updateStateMergePair (TaggedItem tag item) (Just leftItem, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Just rightItem) = case tag of
LeftItem -> (Just item, Just rightItem)
RightItem -> (Nothing, Just item)
updateStateMergePair (TaggedItem tag item) (Just leftItem, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Just leftItem, Just item)
updateStateMergePair (TaggedItem tag item) (Nothing, Nothing) = case tag of
LeftItem -> (Just item, Nothing)
RightItem -> (Nothing, Just item)
pairTag :: (Monad m) => ConduitT (TaggedItem a) (StateMergePair a) m ()
pairTag = evalStateC (Nothing, Nothing) pairTagC
mergeSort :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (StateMergePair a) m ()
mergeSort func series1 series2 = mergeTag func series1 series2 .| pairTag
I borrowed the mergeC2 function from https://github.com/luispedro/conduit-algorithms ...
I am only a beginner in Haskell so the code is certainly not optimal.
I've managed to create your zipC2 function:
import Data.Ord
import Conduit
import Control.Monad
zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
zipC2Def f c1 c2 (s1, s2) = do
ma <- c1 .| peekC
mb <- c2 .| peekC
case (ma, mb) of
(Just a, Just b) ->
case (f a b, f b a) of
(True, True) -> do
yield (ma, mb)
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
(_, True) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
(True, _) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
_ ->
zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
(Just a, Nothing) -> do
yield (ma, s2)
zipC2Def f (c1 .| drop1) c2 (ma, s2)
(Nothing, Just b) -> do
yield (s1, mb)
zipC2Def f c1 (c2 .| drop1) (s1, mb)
_ -> return ()
where
drop1 = dropC 1 >> takeWhileC (const True)
zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)
main :: IO ()
main =
let
series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
in
putStrLn $ show $ runConduitPure $
(zipC2 (<=) series1 series2)
.| sinkList
output:
[(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With