Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging sources using Haskell Conduit

Is it possible to build a function (say zipC2) in Conduit that would turn the following sources:

series1 = yieldMany [2, 4, 6, 8, 16 :: Int]

series2 = yieldMany [1, 5, 6 :: Int]

into one that would produce the following pairs (shown here as list):

[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]

It would be called with a comparison function in the following way:

runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )

There used to be a mergeSources function in previous versions that did something relatively similar (without a memory effect though), but it disappeared in the most recent version (1.3.1).

Clarification about how the function works: The idea is to take 2 sources A (generating values a) and B (generating values b).

We then generate pairs:

If a < b we first build (Just a, Nothing)

If b < a it would yield (Nothing, Just b)

If a == b we update both sides and we produce (Just a, Just b)

The value from the source that was not updated is not consumed and is used for the next round of comparisons. Only updated values are consumed.

We then keep updating the pair, according to the values from A and B relative to each other.

In other words: we update the left side of the pair if a < b, the right side if b < a, or both sides if a == b. Any unconsumed value is kept in memory for the next round of comparison.

like image 827
Christophe Avatar asked Dec 06 '25 18:12

Christophe


2 Answers

The code below works as expected (I called the function mergeSort):

module Data.Conduit.Merge where

import Prelude (Monad, Bool, Maybe(..), Show, Eq)
import Prelude (otherwise, return)
import Prelude (($))
import Conduit (ConduitT)
import Conduit (evalStateC, mapC, yield, await)
import Conduit ((.|))
import Control.Monad.State (get, put, lift)
import Control.Monad.Trans.State.Strict (StateT)

import qualified Data.Conduit.Internal as CI

-- | Takes two sources and merges them.
-- This comes from https://github.com/luispedro/conduit-algorithms made available thanks to Luis Pedro Coelho.
mergeC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 comparator (CI.ConduitT s1) (CI.ConduitT s2) = CI.ConduitT $  processMergeC2 comparator s1 s2

processMergeC2 :: Monad m => (a -> a -> Bool)
                        -> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s1    ConduitT () a m ()
                        -> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s2    ConduitT () a m ()
                        -> ((() -> CI.Pipe () () a () m b ) -> CI.Pipe () () a () m b ) -- rest  ConduitT () a m ()
processMergeC2 comparator s1 s2 rest = go (s1 CI.Done) (s2 CI.Done)
    where
        go s1''@(CI.HaveOutput s1' v1) s2''@(CI.HaveOutput s2' v2)  -- s1''@ and s2''@ simply name the pattern expressions
            | comparator v1 v2 = CI.HaveOutput (go s1' s2'') v1
            | otherwise = CI.HaveOutput (go s1'' s2') v2
        go s1'@CI.Done{} (CI.HaveOutput s v) = CI.HaveOutput (go s1' s) v
        go (CI.HaveOutput s v) s1'@CI.Done{}  = CI.HaveOutput (go s s1')  v
        go CI.Done{} CI.Done{} = rest ()
        go (CI.PipeM p) left = do
            next <- lift p
            go next left
        go right (CI.PipeM p) = do
            next <- lift p
            go right next
        go (CI.NeedInput _ next) left = go (next ()) left
        go right (CI.NeedInput _ next) = go right (next ())
        go (CI.Leftover next ()) left = go next left
        go right (CI.Leftover next ()) = go right next

data MergeTag = LeftItem | RightItem deriving (Show, Eq)
data TaggedItem a = TaggedItem MergeTag a deriving (Show, Eq)
mergeTag :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (TaggedItem a) m ()
mergeTag func series1 series2 = mergeC2 (tagSort func) taggedSeries1 taggedSeries2
                where
                    taggedSeries1 = series1 .| mapC (\item -> TaggedItem LeftItem item)
                    taggedSeries2 = series2 .| mapC (\item -> TaggedItem RightItem item)
                    tagSort :: (a -> a -> Bool) -> TaggedItem a -> TaggedItem a -> Bool
                    tagSort f (TaggedItem _ item1) (TaggedItem _ item2) = f item1 item2

type StateMergePair a = (Maybe a, Maybe a)
pairTagC :: (Monad m) => ConduitT  (TaggedItem a) (StateMergePair a) (StateT (StateMergePair a) m) ()
pairTagC = do
    input <- await
    case input of
        Nothing -> return ()
        Just taggedItem -> do
            stateMergePair <- lift get
            let outputState = updateStateMergePair taggedItem stateMergePair
            lift $ put outputState
            yield outputState
            pairTagC

updateStateMergePair :: TaggedItem a -> StateMergePair a -> StateMergePair a
updateStateMergePair (TaggedItem tag item) (Just leftItem, Just rightItem) = case tag of
    LeftItem -> (Just item, Just rightItem)
    RightItem -> (Just leftItem, Just item)

updateStateMergePair (TaggedItem tag item) (Nothing, Just rightItem) = case tag of
    LeftItem -> (Just item, Just rightItem)
    RightItem -> (Nothing, Just item)

updateStateMergePair (TaggedItem tag item) (Just leftItem, Nothing) = case tag of
    LeftItem -> (Just item, Nothing)
    RightItem -> (Just leftItem, Just item)

updateStateMergePair (TaggedItem tag item) (Nothing, Nothing) = case tag of
    LeftItem -> (Just item, Nothing)
    RightItem -> (Nothing, Just item)

pairTag :: (Monad m) => ConduitT  (TaggedItem a) (StateMergePair a) m ()
pairTag = evalStateC (Nothing, Nothing) pairTagC

mergeSort :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (StateMergePair a) m ()
mergeSort func series1 series2 = mergeTag func series1 series2 .| pairTag

I borrowed the mergeC2 function from https://github.com/luispedro/conduit-algorithms ...

I am only a beginner in Haskell so the code is certainly not optimal.

like image 58
Nathan François Avatar answered Dec 09 '25 06:12

Nathan François


I've managed to create your zipC2 function:

import Data.Ord
import Conduit
import Control.Monad

zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
zipC2Def f c1 c2 (s1, s2) = do
  ma <- c1 .| peekC
  mb <- c2 .| peekC
  case (ma, mb) of
    (Just a, Just b) ->
      case (f a b, f b a) of
        (True, True) -> do
          yield (ma, mb)
          zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
        (_, True) -> do
          yield (s1, mb)
          zipC2Def f c1 (c2 .| drop1) (s1, mb)
        (True, _) -> do
          yield (ma, s2)
          zipC2Def f (c1 .| drop1) c2 (ma, s2)
        _ ->
          zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
    (Just a, Nothing) -> do
      yield (ma, s2)
      zipC2Def f (c1 .| drop1) c2 (ma, s2)
    (Nothing, Just b) -> do
      yield (s1, mb)
      zipC2Def f c1 (c2 .| drop1) (s1, mb)
    _ -> return ()
  where
    drop1 = dropC 1 >> takeWhileC (const True)

zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)

main :: IO ()
main = 
  let
    series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
    series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
  in
  putStrLn $ show $ runConduitPure $
    (zipC2 (<=) series1 series2)
    .| sinkList

output:

[(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]

like image 44
Karol Samborski Avatar answered Dec 09 '25 08:12

Karol Samborski



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!