The goal is to have a conduit with the following type signature
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
The conduit should repeatedly parse protocol buffers (using the ByteString -> a
function) received via TCP/IP (using the network-conduit
package).
The wire message format is
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(The curly braces are not party of the protocol, only used here to separate the entities).
The first idea was to use sequenceSink
to repeatedly apply a Sink
that is able to parse one ProtoBuf:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
It doens't work (only works for the first protocol buffer) because there seems to be a number of "leftover" bytes already read from the source but not consumed via CB.take
that get discarded.
And I found no way of pushing "the rest back into the source".
Did I get the concept entirely wrong?
PS: Even if I use protocol buffers here, the problem is not related to protocol buffers. To debug the problem I always use {length}{UTF8 encoded string}{length}{UTF8 encoded string}...
and a conduit similar to the above one (utf8StringConduit :: MonadResource m => Conduit ByteString m Text
).
I just tried to replace the state (no state ()
in the sample above) by the remaining bytes and replaced the CB.take
calls by calls to a function that first consumes the already read bytes (from the state) and calls await
only as needed (when the state is not large enough). Unfortunately, that doesn't work either because as soon as the Source has no bytes left, sequenceSink
does not execute the code but the state still contains the remaining bytes :-(.
If you should be interested in the code (which isn't optimized or very good but should be enough to test):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"
For protocol buffer parsing and serializing we use messageWithLengthPutM
and messageWithLengthGetM
(see below) but I assume it uses a varint encoding for the length, which is not what you need. I'd probably try to adapt our implementation below by replacing the messageWithLength
Get/Put with something like
myMessageWithLengthGetM =
do size <- getWord32be
getMessageWithSize size
but I have no idea how to implement the getMessageWithSize
using the available functions from the protocol buffer package. On the other hand you could just getByteString
and then "reparse" the bytestring.
Regarding conduits: Have you tried implementing the conduit without Data.Conduit.Util
? Something like
protobufConduit protobufDecode = loop
where
loop =
do len <- liftM convertLen (CB.take 4)
bs <- CB.take len
yield (protobufDecode bs)
loop
Here's the code we use:
pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)
pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
where
new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
read parse =
do mbs <- await
case mbs of
Just bs -> checkResult (parse bs)
Nothing -> return ()
checkResult result =
case result of
Failed _ errmsg -> fail errmsg
Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
Finished rest _ msg ->
do yield msg
checkResult (runGet messageWithLengthGetM rest)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With