I really like the concept of conduit/pipes for applying operations to a streaming IO source. I am interested in building tools that work on very large log files. One of the attractions of moving to Haskell from Python/Ruby is the easier way of writing parallel code, but I can't find any documentation of this. How could I set up a conduit-flow which reads lines from a file and works on them in parallel (ie. with 8 cores, it should read eight lines, and hand them off to eight different threads to be processed, and then collected again etc), ideally with as little "ceremony" as possible...
Optionally it could be noted whether the lines need to be rejoined in order or not, if that could influence the speed of the process?
I am sure it would be possible to cobble together something myself using ideas from the Parallel Haskell book, but it seems to me that running a pure function in parallel (parmap etc) in the middle of a Conduit workflow should be very easy?
As an example of the "internal parallelism" mentioned by Petr Pudlák in his comment, consider this function (I'm using pipes
, but could be implemented with conduit
just as easily):
import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L
concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer =
L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
>->
forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield)
This function takes as parameters a group size, an action we want to run for each value of type a
, and a Producer
of a
values.
It returns a new Producer
. Internally, the producer reads a
values in batches of groupsize
, processes them concurrently, and yields the results one by one.
The code uses Pipes.Group
to "partition" the original producer into sub-producers of size groupsize
, and then Control.Foldl
to "fold" each sub-producer into a list.
For more sophisticated tasks, you could turn to the asynchronous channels provided by pipes-concurrency
or stm-conduit
. But these yank you out somewhat of the "single pipeline" worldview of vanilla pipes/conduits.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With