Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel processing in conduit flow

I really like the concept of conduit/pipes for applying operations to a streaming IO source. I am interested in building tools that work on very large log files. One of the attractions of moving to Haskell from Python/Ruby is the easier way of writing parallel code, but I can't find any documentation of this. How could I set up a conduit-flow which reads lines from a file and works on them in parallel (ie. with 8 cores, it should read eight lines, and hand them off to eight different threads to be processed, and then collected again etc), ideally with as little "ceremony" as possible...

Optionally it could be noted whether the lines need to be rejoined in order or not, if that could influence the speed of the process?

I am sure it would be possible to cobble together something myself using ideas from the Parallel Haskell book, but it seems to me that running a pure function in parallel (parmap etc) in the middle of a Conduit workflow should be very easy?

like image 294
Stian Håklev Avatar asked Nov 04 '14 18:11

Stian Håklev


1 Answers

As an example of the "internal parallelism" mentioned by Petr Pudlák in his comment, consider this function (I'm using pipes, but could be implemented with conduit just as easily):

import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer = 
      L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
      >->
      forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 

This function takes as parameters a group size, an action we want to run for each value of type a, and a Producer of a values.

It returns a new Producer. Internally, the producer reads a values in batches of groupsize, processes them concurrently, and yields the results one by one.

The code uses Pipes.Group to "partition" the original producer into sub-producers of size groupsize, and then Control.Foldl to "fold" each sub-producer into a list.

For more sophisticated tasks, you could turn to the asynchronous channels provided by pipes-concurrency or stm-conduit. But these yank you out somewhat of the "single pipeline" worldview of vanilla pipes/conduits.

like image 122
danidiaz Avatar answered Sep 23 '22 15:09

danidiaz