Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallely `map(...)` on a custom, single-threaded iterator in Rust?

I have a MyReader that implements Iterator and produces Buffers where Buffer : Send. MyReader produces a lot of Buffers very quickly, but I have a CPU-intensive job to perform on each Buffer (.map(|buf| ...)) that is my bottleneck, and then gather the results (ordered). I want to parallelize the CPU intense work - hopefully to N threads, that would use work stealing to perform them as fast as the number of cores allows.

Edit: To be more precise. I am working on rdedup. MyStruct is Chunker which reads io::Read (typically stdio), finds parts (chunks) of data and yields them. Then map() is supposed, for each chunk, to calculate sha256 digest of it, compress, encrypt, save and return the digest as the result of map(...). Digest of saved data is used to build index of the data. The order between chunks being processed by map(...) does not matter, but digest returned from each map(...) needs to be collected in the same order that the chunks were found. The actual save to file step is offloaded to yet another thread (writter thread). actual code of PR in question

I hoped I can use rayon for this, but rayon expect an iterator that is already parallizable - eg. a Vec<...> or something like that. I have found no way to get a par_iter from MyReader - my reader is very single-threaded in nature.

There is simple_parallel but documentation says it's not recommended for general use. And I want to make sure everything will just work.

I could just take a spmc queue implementation and a custom thread_pool, but I was hopping for an existing solution that is optimized and tested.

There's also pipeliner but doesn't support ordered map yet.

like image 585
dpc.pw Avatar asked Feb 27 '17 01:02

dpc.pw


1 Answers

In general, preserving order is a pretty tough requirement as far as parallelization goes.

You could try to hand-make it with a typical fan-out/fan-in setup:

  • a single producer which tags inputs with a sequential monotonically increasing ID,
  • a thread pool which consumes from this producer and then sends the result toward the final consumer,
  • a consumer who buffers and reorders result so as to treat them in the sequential order.

Or you could raise the level of abstraction.


Of specific interest here: Future.

A Future represents the result of a computation, which may or may not have happened yet. A consumer receiving an ordered list of Future can simply wait on each one, and let buffering occur naturally in the queue.

For bonus points, if you use a fixed size queue, you automatically get back-pressure on the consumer.


And therefore I would recommend building something of CpuPool.

The setup is going to be:

use std::sync::mpsc::{Receiver, Sender};

fn produce(sender: Sender<...>) {
    let pool = CpuPool::new_num_cpus();

    for chunk in reader {
        let future = pool.spawn_fn(|| /* do work */);
        sender.send(future);
    }

    // Dropping the sender signals there's no more work to consumer
}

fn consume(receiver: Receiver<...>) {
    while let Ok(future) = receiver.recv() {
        let item = future.wait().expect("Computation Error?");

        /* do something with item */
    }
}

fn main() {
    let (sender, receiver) = std::sync::mpsc::channel();

    std::thread::spawn(move || consume(receiver));

    produce(sender);
}
like image 154
Matthieu M. Avatar answered Nov 08 '22 21:11

Matthieu M.