Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I turn a Sink into a Conduit?

I'm trying to write a Conduit using an attoparsec parser. Specifically, given parseOne :: Parser T, I'd like to construct a Conduit ByteString m T that repeatedly applies the parser to the input and streams the results.

attoparsec-conduit offers sinkParser to turn a Parser into a Sink, but how can I turn this Sink into a Conduit? What I'm looking for is a function like:

conduitSink :: (Resource m) => Sink a m b -> Conduit a m b

which repeatedly feeds data into the Sink, producing each result as it goes. It seems like it could be written fairly easily as a manual loop, but I'm wondering if there's a better way.

The lack of this seemingly-obvious function in the conduit library makes me think I might be doing something wrong; is there a better way to accomplish this? The use case is turning raw bytes into the parsed form of a message-based network protocol, to be processed by later stages of the pipeline. I already have the opposite direction (i.e. Conduit T m ByteString) thanks to blaze-builder-conduit, so this seemed like the most natural way to structure things.

like image 696
ehird Avatar asked Jan 28 '12 04:01

ehird


1 Answers

You need to use the SequencedSink system for this; it uses a sink and a tracked state to produce a conduit from the repeated application of a sink producer.

The sink you have created is optimized for incrementally parsing one value, which will be the result at the end of a conduit sequence.

Since you want this to be part of a conduit pipeline, however, and each chunk of the incoming ByteString might or might not match your parser one or multiple times, you need to take care to get more fine-grained control of the parsing process, passing on the state of an incomplete parse between every application of the sink.

Assuming, for example, that your parser parses [--] or [----] etc., and T is Int denoting the number of dashes parsed, you need to track the state of the parser as demonstrated by this:

Input chunk    Sink result - Data.Conduit.SequencedSinkResponse
[--][---]      Emit Nothing [2, 3]
[---][---      Emit (Just #func) [3]
---------      Emit (Just #func) []
]              Emit Nothing [12]
               Stop

In this case, I use Maybe (ByteString -> Data.Attoparsec.ByteString.Result) as the passed-on state; a different data type might be more suitable depending on the situation.

This explicit stream treatment is needed to maintain the pipeline nature of the conduit; having the parser conduit be a "bottleneck", always waiting for enough data to chunk-wise satisfy a parser, would be a major performance sink.

The implementation of the needed sink should be fairly trivial with the available ResourceT monad interface.

EDIT: Simply applying your sink in a loop would indeed be the simplest solution, but it will have slightly different performance characteristics if your parser parses short snippets that often end up on the borders of byte chunks.

like image 165
dflemstr Avatar answered Oct 20 '22 05:10

dflemstr