I use haskell for line-based data processing, i.e. tasks where you can apply sed
, awk
and similar tools. As a trivial example, let's prepend 000
to every line from standard input.
I have three alternative ways to do the task:
ByteString
sByteString
processing inside.example.hs
:
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB
main = do
[arg] <- getArgs
case arg of
"lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines
"lines" -> runConduitRes $ stdinC .| CB.lines .|
mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC
"chunks" -> runConduitRes $ stdinC .| lineChunksC .|
mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC
lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
where
go acc = if
| Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
| otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
where
go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next
breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example $ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done lazy real 2.99 user 3.06 sys 0.07 lines real 3.30 user 3.36 sys 0.06 chunks real 1.83 user 1.95 sys 0.06
(The results are consistent across multiple runs, and also hold for lines with several numbers).
So chunks
is 1.6x faster than lines
which is a bit faster than lazy
. This means that conduits can be faster than plain bytestrings, but the overhead of conduit pipes is too heavy when you split chunks into short lines.
What I don't like about chunks
approach is that it mixes both conduit and pure worlds, and it makes it harder to use it for more complex tasks.
The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines
approach?
EDIT1: Per @Michael's suggestion I joined two mapC
into one mapC (("000" ++). (
snoc10))
in lines
solution, to make number of pipes (.|
) same between lines
and chunks
. This made it perform a bit better (down from 3.3s to 2.8s), but still significantly slower than chunks
.
Also I tried older Conduit.Binary.lines
which Michael suggested in comments, and it also improves performance a bit, by ~0.1s.
EDIT2: Fixed lineChunksC
so it works with very small chunks, e.g.
> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList
["\n","r\n","\n"]
My guess is that, for "lines", the mapC ("000" ++) .| mapC (`snoc` 10)
part is doing a lot of work.
Concatenating several strict ByteStrings
into another strict ByteString
is expensive. Concatenating them into a lazy ByteString
tends to be more efficient.
To avoid this cost, you can yield each part individually downstream as a strict ByteString
(but be aware that then we aren't talking about "lines" anymore).
Alternatively, yield each transformed line as a lazy ByteString
downstream.
The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines approach?
Some streaming libraries have an interesting feature: you can delimit lines in the stream, and manipulate them, without the need to materialize entire lines in memory at any point.
Here I'm using the streaming and streaming-bytestring packages, because I'm more familiar with them.
In module Data.ByteString.Streaming.Char8
of streaming-bytestring, we have the lines
function:
lines :: Monad m => ByteString m r -> Stream (ByteString m) m r
lines turns a ByteString into a connected stream of ByteStrings at divide at newline characters. The resulting strings do not contain newlines. This is the genuinely streaming lines which only breaks chunks, and thus never increases the use of memory.
The gist of it is that ByteString m r
is already a streaming type! So this version of lines
transforms a stream into a "stream of streams". And we can only reach the "next stream" (next line) by exhausting the "current stream" (current line).
Your "lines" example could be written as:
{-# language OverloadedStrings #-}
module Main where
import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q
main :: IO ()
main = Q.stdout
. Q.unlines
. S.maps (\line -> "000" *> line)
. Q.lines
$ Q.stdin
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With