I have a MyReader
that implements Iterator
and produces Buffer
s where Buffer : Send
. MyReader
produces a lot of Buffer
s very quickly, but I have a CPU-intensive job to perform on each Buffer
(.map(|buf| ...)
) that is my bottleneck, and then gather the results (ordered). I want to parallelize the CPU intense work - hopefully to N threads, that would use work stealing to perform them as fast as the number of cores allows.
Edit: To be more precise. I am working on rdedup
. MyStruct
is Chunker
which reads io::Read
(typically stdio), finds parts (chunks) of data and yields them. Then map()
is supposed, for each chunk, to calculate sha256 digest of it, compress, encrypt, save and return the digest as the result of map(...)
. Digest of saved data is used to build index
of the data. The order between chunks being processed by map(...)
does not matter, but digest returned from each map(...)
needs to be collected in the same order that the chunks were found. The actual save
to file step is offloaded to yet another thread (writter thread). actual code of PR in question
I hoped I can use rayon
for this, but rayon
expect an iterator that is already parallizable - eg. a Vec<...>
or something like that. I have found no way to get a par_iter
from MyReader
- my reader is very single-threaded in nature.
There is simple_parallel
but documentation says it's not recommended for general use. And I want to make sure everything will just work.
I could just take a spmc queue implementation and a custom thread_pool
, but I was hopping for an existing solution that is optimized and tested.
There's also pipeliner
but doesn't support ordered map yet.
In general, preserving order is a pretty tough requirement as far as parallelization goes.
You could try to hand-make it with a typical fan-out/fan-in setup:
Or you could raise the level of abstraction.
Of specific interest here: Future
.
A Future
represents the result of a computation, which may or may not have happened yet. A consumer receiving an ordered list of Future
can simply wait on each one, and let buffering occur naturally in the queue.
For bonus points, if you use a fixed size queue, you automatically get back-pressure on the consumer.
And therefore I would recommend building something of CpuPool
.
The setup is going to be:
use std::sync::mpsc::{Receiver, Sender};
fn produce(sender: Sender<...>) {
let pool = CpuPool::new_num_cpus();
for chunk in reader {
let future = pool.spawn_fn(|| /* do work */);
sender.send(future);
}
// Dropping the sender signals there's no more work to consumer
}
fn consume(receiver: Receiver<...>) {
while let Ok(future) = receiver.recv() {
let item = future.wait().expect("Computation Error?");
/* do something with item */
}
}
fn main() {
let (sender, receiver) = std::sync::mpsc::channel();
std::thread::spawn(move || consume(receiver));
produce(sender);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With