I'm trying to write a basic network server using pipes and the assorted libraries that build on it. The intended flow would be:
get bytestring from socket -> decode using binary -> server logic goes here -> send response to socket
Which I figured would be something like:
fromSocket s 4096 >-> decode >-> serverLogic >-> toSocket s
pipes-binary has a decode
and a decodeMany
, but I am not sure I understand the difference, and I don't know how to use decode
. Why does decodeMany
take the upstream pipe as an argument instead of being chained off of it with >->
? And how do you use decode
, what is the StateT
for and what should my pipe chain end up looking like?
The StateT (Producer a m r) m x
idiom comes from pipes-parse
's "Low-level Parsers". It typically means that the library is using draw
and unDraw
to pull values off a Producer
and return them if they're unused. It's an essential component of parsing where failure might occur. It also requires the StateT
layer to indicate that a pipe is being selectively drained and refilled in a stateful manner.
-- | Draw one element from the underlying Producer,
-- returning Left if the Producer is empty
draw :: Monad m => StateT (Producer a m r) m (Either r a)
-- | Push back an element onto the underlying Producer
unDraw :: Monad m => a -> StateT (Producer a m r) m ()
So what does that mean for decode
and decodeMany
? If we look at some simplified types of those functions
-- for (Monad m, Binary b)
decode :: StateT (Producer ByteString m r) m (Maybe b)
decodeMany :: Producer ByteString m r
-> Producer' b m (Either (Producer ByteString m r) r)
We first see that decode
is drawing
off enough ByteString
chunks from a Producer ByteString
statefully so as to try to parse a b
. Since the chunk boundary on the ByteString
s may not align with a parse boundary it's important to do this in StateT
so that the leftover chunks can be unDraw
-ed back into the Producer
.
decodeMany
builds atop decode
and attempts to repeatedly decode
b
s off the input Producer returning a "continuation" Producer
of leftover ByteString
s on failure.
Long story short, due to a need to unDraw
leftover ByteString
chunks, we can just compose these things together into a chain with (>->)
. If you want to do that, you can use something like decodeMany
to transform a producer and then chain the result, but you'll want to handle error cases carefully.
I want to complement J. Abrahamson's answer by answering your other question about why the decoder is not a Pipe
.
The difference between a Pipe
with a type like:
pipe :: Pipe a b m r
... and function between Producer
s like (I call these "getter"s):
getter :: Producer a m r -> Producer b m r
... is that a Pipe
can be used to transform Producer
s, Consumer
s, and other Pipe
s:
(>-> pipe) :: Producer a m r -> Producer b m r
(>-> pipe) :: Pipe x a m r -> Pipe x b m r
(pipe >->) :: Consumer b m r -> Consumer a m r
(pipe >->) :: Pipe b y m r -> Pipe a y m r
... whereas a "getter" can only transform Producer
s. Some things cannot be modeled correctly using Pipe
s and leftovers are one of those things.
conduit
purports to model leftovers using Conduit
s (the conduit
analog of Pipe
s) but it gets this wrong. I've put together a simple example showing why. First, just implement a peek
function for conduit
:
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Data.Conduit.List (isolate, sourceList)
peek :: Monad m => Sink a m (Maybe a)
peek = do
ma <- await
case ma of
Nothing -> return ()
Just a -> leftover a
return ma
This works as expected for simple cases like this:
source :: Monad m => Source m Int
source = sourceList [1, 2]
sink1 :: Show a => Sink a IO ()
sink1 = do
ma1 <- peek
ma2 <- peek
lift $ print (ma1, ma2)
This will return the first element of the source twice:
>>> source $$ sink1
(Just 1,Just 1)
... but if you compose a Conduit
upstream of a Sink
, any leftovers that the sink pushes back are irreversibly lost:
sink2 :: Show a => Sink a IO ()
sink2 = do
ma1 <- isolate 10 =$ peek
ma2 <- peek
lift $ print (ma1, ma2)
Now the second peek
incorrectly returns 2
:
>>> source $$ sink2
(Just 1,Just 2)
Also, note that pipes-parse
just got a new major version released today, which simplifies the API and adds an extensive tutorial that you can read here.
This new API correctly propagates leftovers further upstream. Here is the analogous example for pipes
:
import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import Prelude hiding (splitAt)
parser :: Show a => Parser a IO ()
parser = do
ma1 <- zoom (splitAt 10) peek
ma2 <- peek
lift $ print (ma1, ma2)
producer :: Monad m => Producer Int m ()
producer = each [1, 2]
Even though the first peek
is also limited to the first 10 values, it correctly undraws the first value and makes it available to the second peek
:
>>> evalStateT parser producer
(Just 1,Just 1)
Conceptually, the reason why pipes-parse
"thinks in terms of Producer
s" is because otherwise the concept of leftovers is not clearly defined. If you don't clearly define what your source is, you can't clearly articulate where leftovers values should go. This is why Pipe
s and Consumer
s do not lend themselves well to tasks that require leftovers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With