Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join two consumers into a single consumer that returns multiple values?

I have been experimenting with the new pipes-http package and I had a thought. I have two parsers for a web page, one that returns line items and another a number from elsewhere in the page. When I grab the page, it'd be nice to string these parsers together and get their results at the same time from the same bytestring producer, rather than fetching the page twice or fetching all the html into memory and parsing it twice.

In other words, say you have two Consumers:

c1 :: Consumer a m r1
c2 :: Consumer a m r2

Is it possible to make a function like this:

combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2)
combineConsumers = undefined

I have tried a few things, but I can't figure it out. I understand if it isn't possible, but it would be convenient.

Edit:

I'm sorry it turns out I was making an assumption about pipes-attoparsec, due to my experience with conduit-attoparsec that caused me to ask the wrong question. Pipes-attoparsec turns an attoparsec into a pipes Parser when I just assumed that it would return a pipes Consumer. That means that I can't actually turn two attoparsec parsers into consumers that take text and return a result, then use them with the plain old pipes ecosystem. I'm sorry but I just don't understand pipes-parse.

Even though it doesn't help me, Arthur's answer is pretty much what I envisioned when I asked the question, and I'll probably end up using his solution in the future. In the meantime I'm just going to use conduit.

like image 841
David McHealy Avatar asked Feb 13 '23 20:02

David McHealy


1 Answers

It the results are "monoidal", you can use the tee function from the Pipes prelude, in combination with a WriterT.

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import Control.Monad
import Control.Monad.Writer
import Control.Monad.Writer.Class
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.Text as T

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

counter :: Monoid w => T.Text 
                    -> (T.Text -> w) 
                    -> Consumer T.Text (WriterT w IO) ()
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain

main :: IO ()
main = do
    result <-runWriterT $ runEffect $ 
        hoist lift textSource >-> 
        P.tee (counter "foo" inject1) >-> (counter "bar" inject2)
    putStrLn . show $ result
    where
    inject1 _ = (,) (Sum 1) mempty
    inject2 _ = (,) mempty (Sum 1)

Update: As mentioned in a comment, the real problem I see is that in pipes parsers aren't Consumers. And how can you run two parsers concurrently if they have different behaviours regarding leftovers? What happens if one of the parsers wants to "un-draw" some text and the other parser doesn't?

One possible solution is to run the parsers in a truly concurrent manner, in different threads. The primitives in the pipes-concurrency package let you "duplicate" a Producer by writing the same data to two different mailboxes. And then each parser can do whatever it wants with its own copy of the producer. Here's an example which also uses the pipes-parse, pipes-attoparsec and async packages:

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import qualified Data.Text as T
import Data.Attoparsec.Text hiding (takeWhile)
import Data.Attoparsec.Combinator
import Control.Applicative
import Control.Monad
import Control.Monad.State.Strict
import Pipes
import qualified Pipes.Prelude as P
import qualified Pipes.Attoparsec as P
import qualified Pipes.Concurrent as P
import qualified Control.Concurrent.Async as A

parseChars :: Char -> Parser [Char] 
parseChars c = fmap mconcat $ 
    many (notChar c) *> many1 (some (char c) <* many (notChar c))

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

parseConc :: Producer T.Text IO () 
          -> Parser a 
          -> Parser b 
          -> IO (Either P.ParsingError a,Either P.ParsingError b)
parseConc producer parser1 parser2 = do
    (outbox1,inbox1,seal1) <- P.spawn' P.Unbounded
    (outbox2,inbox2,seal2) <- P.spawn' P.Unbounded
    feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1) 
                                              >->        P.toOutput outbox2
    sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2
    r <- A.runConcurrently $ 
        (,) <$> (A.Concurrently $ parseInbox parser1 inbox1)
            <*> (A.Concurrently $ parseInbox parser2 inbox2)
    A.wait sealing
    return r 
    where
    parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox)

main :: IO ()
main = do
    (Right a, Right b) <- parseConc textSource (parseChars 'o')  (parseChars 'a')
    putStrLn . show $ (a,b) 

The result is:

("oooo","aa")

I'm not sure how much overhead this approach introduces.

like image 153
danidiaz Avatar answered Apr 19 '23 23:04

danidiaz