I'm trying to speed up an expensive computation on a large vector by using threads. My function consumes a vector, computes a vector of new values (it doesn't aggregate, but input order has to be retained), and returns it. However, I'm struggling to figure out how to spawn threads, assign vector slices to each, and then collect and combine the results.
// tunable
const NUMTHREADS: i32 = 4;
fn f(val: i32) -> i32 {
// expensive computation
let res = val + 1;
res
}
fn main() {
// choose an odd number of elements
let orig = (1..14).collect::<Vec<i32>>();
let mut result: Vec<Vec<i32>> = vec!();
let mut flat: Vec<i32> = Vec::with_capacity(orig.len());
// split into slices
for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
result.push(
chunk.iter().map(|&digit|
f(digit)).collect()
);
};
// flatten result vector
for subvec in result.iter() {
for elem in subvec.iter() {
flat.push(elem.to_owned());
}
}
println!("Flattened result: {:?}", flat);
}
The threaded computation should be taking place between for chunk…
and // flatten …
, but I can't find many simple examples of spawning x threads, assigning chunks sequentially, and returning the newly-computed vector out of the thread and into a container so it can be flattened. Do I have to wrap orig.chunks()
in an Arc
, and manually grab each chunk in a loop? Do I have to pass f
into each thread? Will I have to use a B-Tree to ensure that input and output order match? Can I just use simple_parallel
?
Well, this is an ideal application for the unstable thread::scoped()
:
#![feature(scoped)]
use std::thread::{self, JoinGuard};
// tunable
const NUMTHREADS: i32 = 4;
fn f(val: i32) -> i32 {
// expensive computation
let res = val + 1;
res
}
fn main() {
// choose an odd number of elements
let orig: Vec<i32> = (1..14).collect();
let mut guards: Vec<JoinGuard<Vec<i32>>> = vec!();
// split into slices
for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
let g = thread::scoped(move || chunk.iter().cloned().map(f).collect());
guards.push(g);
};
// collect the results
let mut result: Vec<i32> = Vec::with_capacity(orig.len());
for g in guards {
result.extend(g.join().into_iter());
}
println!("Flattened result: {:?}", result);
}
It is unstable and won't likely be stabilized in this form because it has an inherent flaw (you can find more here). As far as I can see, simple_parallel
is just an extension of this approach - it hides the fiddling with JoinGuards
and also can be used in stable Rust (probably with some unsafe
ty, I believe). It is not recommended for the general use, however, as its docs suggest.
Of course, you can use thread::spawn()
, but then you will need to clone each chunk so it could be moved into each thread:
use std::thread::{self, JoinHandle};
// tunable
const NUMTHREADS: i32 = 4;
fn f(val: i32) -> i32 {
// expensive computation
let res = val + 1;
res
}
fn main() {
// choose an odd number of elements
let orig: Vec<i32> = (1..14).collect();
let mut guards: Vec<JoinHandle<Vec<i32>>> = vec!();
// split into slices
for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
let chunk = chunk.to_owned();
let g = thread::spawn(move || chunk.into_iter().map(f).collect());
guards.push(g);
};
// collect the results
let mut result: Vec<i32> = Vec::with_capacity(orig.len());
for g in guards {
result.extend(g.join().unwrap().into_iter());
}
println!("Flattened result: {:?}", result);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With