Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ways to improve performance for line-based conduits

I use haskell for line-based data processing, i.e. tasks where you can apply sed, awk and similar tools. As a trivial example, let's prepend 000 to every line from standard input.

I have three alternative ways to do the task:

  1. Lazy IO with lazy ByteStrings
  2. Line-based conduit.
  3. Chunk-based conduit with pure strict ByteString processing inside.

example.hs:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB

main = do
  [arg] <- getArgs
  case arg of

    "lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines

    "lines" -> runConduitRes $ stdinC .| CB.lines .|
      mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC

    "chunks" -> runConduitRes $ stdinC .| lineChunksC .|
      mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC


lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
  where
  go acc = if
    | Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
    | otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
    where
    go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next

breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example
$ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done
lazy
real 2.99
user 3.06
sys 0.07

lines
real 3.30
user 3.36
sys 0.06

chunks
real 1.83
user 1.95
sys 0.06

(The results are consistent across multiple runs, and also hold for lines with several numbers).

So chunks is 1.6x faster than lines which is a bit faster than lazy. This means that conduits can be faster than plain bytestrings, but the overhead of conduit pipes is too heavy when you split chunks into short lines.

What I don't like about chunks approach is that it mixes both conduit and pure worlds, and it makes it harder to use it for more complex tasks.

The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines approach?

EDIT1: Per @Michael's suggestion I joined two mapC into one mapC (("000" ++). (snoc10)) in lines solution, to make number of pipes (.|) same between lines and chunks. This made it perform a bit better (down from 3.3s to 2.8s), but still significantly slower than chunks.

Also I tried older Conduit.Binary.lines which Michael suggested in comments, and it also improves performance a bit, by ~0.1s.

EDIT2: Fixed lineChunksC so it works with very small chunks, e.g.

> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList 
["\n","r\n","\n"]
like image 623
modular Avatar asked Oct 29 '16 13:10

modular


1 Answers

My guess is that, for "lines", the mapC ("000" ++) .| mapC (`snoc` 10) part is doing a lot of work.

Concatenating several strict ByteStrings into another strict ByteString is expensive. Concatenating them into a lazy ByteString tends to be more efficient.

To avoid this cost, you can yield each part individually downstream as a strict ByteString (but be aware that then we aren't talking about "lines" anymore).

Alternatively, yield each transformed line as a lazy ByteString downstream.


The question is, did I miss a simple and elegant solution which would allow me to write efficient code in same fashion as with lines approach?

Some streaming libraries have an interesting feature: you can delimit lines in the stream, and manipulate them, without the need to materialize entire lines in memory at any point.

Here I'm using the streaming and streaming-bytestring packages, because I'm more familiar with them.

In module Data.ByteString.Streaming.Char8 of streaming-bytestring, we have the lines function:

lines :: Monad m => ByteString m r -> Stream (ByteString m) m r

lines turns a ByteString into a connected stream of ByteStrings at divide at newline characters. The resulting strings do not contain newlines. This is the genuinely streaming lines which only breaks chunks, and thus never increases the use of memory.

The gist of it is that ByteString m r is already a streaming type! So this version of lines transforms a stream into a "stream of streams". And we can only reach the "next stream" (next line) by exhausting the "current stream" (current line).

Your "lines" example could be written as:

{-# language OverloadedStrings #-}
module Main where

import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q

main :: IO ()
main = Q.stdout
     . Q.unlines
     . S.maps (\line -> "000" *> line)
     . Q.lines
     $ Q.stdin
like image 129
danidiaz Avatar answered Oct 21 '22 06:10

danidiaz