Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to turn a pull based pipe into a push based one?

By default pipes are pull based. This is due to the operator >-> which is implemented via +>> which is the pointful bind operator for his pull category. My understanding is that this means that if you have code like producer >-> consumer, the consumer's body will be called first, then once it awaits data, the producer will be called.

I've seen in the pipes documentation here that you can use the code (reflect .) from Pipes.Core to turn a pull based pipe into a push based pipe. That means instead (correct me if I'm wrong) that in the code above producer >-> consumer, the producer is run first, produces a value, then the consumer tries to consume. That seems really useful and I'd like to know how to do it.

I've also seen in discussions here that there is no push based counterpart to >-> because it is easy to turn any pipe around (I assume with reflect?), but I can't really figure how to do it or find any examples.

Here's some code I've attempted:

stdin :: Producer String IO r
stdin = forever $ do
  lift $ putStrLn "stdin"
  str <- lift getLine
  yield str

countLetters :: Consumer String IO r
countLetters = forever $ do
  lift $ putStrLn "countLetters"
  str <- await
  lift . putStrLn . show . length $ str

-- this works in pull mode
runEffect (stdin >-> countLetters)

-- equivalent to above, works
runEffect ((\() -> stdin) +>> countLetters)

-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))

-- does not compile
runEffect (countLetters >>~ (\() -> stdin))
like image 346
David McHealy Avatar asked Dec 01 '16 21:12

David McHealy


1 Answers

-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))

I gather the problem here is that, while the producer is ran first as expected, the first produced value is dropped. Compare...

GHCi> runEffect (stdin >-> countLetters)
countLetters
stdin
foo
3
countLetters
stdin
glub
4
countLetters
stdin

... with:

GHCi> runEffect (stdin >>~ (\_ -> countLetters))
stdin
foo
countLetters
stdin
glub
4
countLetters
stdin

This issue is discussed in detail by Gabriella Gonzalez's answer to this question. It boils down to how the argument to the function you give to (>>~) is the "driving" input in the push-based flow, and so if you const it away you end up dropping the first input. The solution is to reshape countLetters accordingly:

countLettersPush :: String -> Consumer String IO r
countLettersPush str = do
  lift $ putStrLn "countLetters"
  lift . putStrLn . show . length $ str
  str' <- await
  countLettersPush str'
GHCi> runEffect (stdin >>~ countLettersPush)
stdin
foo
countLetters
3
stdin
glub
countLetters
4
stdin

I've also seen in discussions here that there is no push based counterpart to >-> because it is easy to turn any pipe around (I assume with reflect?)

I'm not fully sure of my ground, but it seems that doesn't quite apply to the solution above. What we can do, now that we have the push-based flow working correctly, is using reflect to turn it around back to a pull-based flow:

-- Preliminary step: switching to '(>~>)'.
stdin >>~ countLettersPush
(const stdin >~> countLettersPush) ()

-- Applying 'reflect', as the documentation suggests.
reflect . (const stdin >~> countLettersPush)
reflect . const stdin <+< reflect . countLettersPush
const (reflect stdin) <+< reflect . countLettersPush

-- Rewriting in terms of '(+>>)'.
(reflect . countLettersPush >+> const (reflect stdin)) ()
reflect . countLettersPush +>> reflect stdin

This is indeed pull-based, as the flow is driven by reflect stdin, the downstream Client:

GHCi> :t reflect stdin
reflect stdin :: Proxy String () () X IO r
GHCi> :t reflect stdin :: Client String () IO r
reflect stdin :: Client String () IO r :: Client String () IO r

The flow, however, involves sending Strings upstream, and so it cannot be expressed in terms of (>->), which is, so to say, downstream-only:

GHCi> -- Compare the type of the second argument with that of 'reflect stdin'
GHCi> :t (>->)
(>->)
  :: Monad m =>
     Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m 
like image 131
duplode Avatar answered Nov 08 '22 12:11

duplode