Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).
These two processes might look something like this (with pipes-4.0
),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,
writeObjects
starts by blocking on request
, having sent the initial ObjId 0
upstream.produceObjects
sends the first object, Obj 0
, downstreamwriteObjects
writes the object and increments its state, and waits on request
, this time sending ObjId 1
upstreamrespond
in produceObjects
returns with ObjId 0
produceObjects
continues at Step (2) with the second object, Obj 1
My initial attempt was with push-based composition as follows,
main = void $ run $ produceObjects objects >>~ const writeObjects
Note the use of const
to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0
gets eaten,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
A pull-based approach,
main = void $ run $ const (produceObjects objects) +>> writeObjects
suffers a similar issue, this time dropping Obj 0
.
How might one go about composing these pieces in the desired manner?
The choice of which composition to use depends on which component should initiate the entire process. If you want the downstream pipe to initiate the process then you want to use pull-based composition (i.e. (>+>)
/(+>>)
) but if you want the upstream pipe to initiate the process then you should use push-based composition (i.e. (>>~)
/(>~>)
). The type errors you got were actually warning you that there is a logical error in your code: you haven't clearly established which component initiates the process first.
From your description, it's obvious that you want control flow to begin from produceObjects
so you want to use push-based composition. Once you use push-based composition, the type of the composition operator will tell you everything you need to know about how to fix your code. I'll take its type and specialize it to your composition chain:
-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
-> (Object -> Client ObjectId Object IO ())
-> Effect IO ()
As you already noticed, the type error you got when you tried to use (>>~)
told you that you were missing an argument of type Object
to your writeObjects
function. This statically enforces that you cannot run any code in writeObjects
before receiving your first Object
(through the initial argument).
The solution is to rewrite your writeObjects
function like this:
writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
where go obj = do i <- get
lift $ lift $ putStrLn $ "Wrote "++ show obj
modify (+1)
obj' <- lift $ request i
go obj'
This then gives the correct behavior:
>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10
You might wonder why this requirement that one of the two pipes takes an initial argument makes sense, other than the abstract justification that this is what the category laws require. The plain English explanation is that the alternative is that you would need buffer the first transmitted Object
"in between" the two pipes before writeObjects
reached its first request
statement. This approach produces a lot of problematic behavior and buggy corner cases, but probably the most significant problem is that pipe composition would no longer be associative and the order of effects would change based on the order in which you composed things.
The nice thing about the bidirectional pipe composition operators is that the types work out so that you can always deduce whether or not a component is "active" (i.e. initiates control) or "passive" (i.e. waits for input) purely by studying the type. If composition says that a certain pipe (like writeObjects
) must take an argument, then it's passive. If it takes no argument (like produceObjects
), then it's active and initiates control. So composition forces you to have at most one active pipe within your pipeline (the pipe that doesn't take an initial argument) and that's the pipe that begins control.
The 'const's are where you are dropping the data. In order to get all the data, you probably want to do a push-based workflow as follows:
writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj = go 0 obj
where
go objid obj = do
lift $ putStrLn $ "Wrote "++show obj
obj' <- request objid
go (objid + 1) obj'
-- produceObjects as before
main = void $ run $ produceObjects objects >>~ writeObjects
We've been discussing this on the mailing list, but I figured I'd throw it up here as well for those who are interested.
Your problem is that you have two coroutines which are both ready to spit out values at each other. Neither one needs the input of the other in order to produce a value. So who gets to go first? Well you said it yourself:
writeObjects
starts by blocking on request, having sent the initialObjId 0
upstream
Okay then, that means we need to delay produceObjects
so that it waits for an ObjId
signal before spitting out the corresponding object (even though it apparently doesn't need said ID).
Dipping into Proxy internals, here is the magic incantation which I will not bother to explain very carefully at this time. The basic idea is just to take input before you need it, then apply the input when needed, but then pretend like you need a new input (even though you don't need that one just yet):
delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r
delayD p0 b' = case p0 of
Request a' f -> Request a' (go . f)
Respond b g -> Respond b (delayD (g b'))
M m -> M (liftM go m)
Pure r -> Pure r
where
go p = delayD p b'
Now, you can use this on produceObjects objects
instead of const
, and your second attempt works as desired:
delayD (produceObjects objects) +>> writeObjects
We're discussing delayD
on the mailing list to see if it merits inclusion in the standard Pipes repertoire.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With