Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consume non-overlapping vector chunks, and combine results

I'm trying to speed up an expensive computation on a large vector by using threads. My function consumes a vector, computes a vector of new values (it doesn't aggregate, but input order has to be retained), and returns it. However, I'm struggling to figure out how to spawn threads, assign vector slices to each, and then collect and combine the results.

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res

}

fn main() {
    // choose an odd number of elements
    let orig = (1..14).collect::<Vec<i32>>();
    let mut result: Vec<Vec<i32>> = vec!();
    let mut flat: Vec<i32> = Vec::with_capacity(orig.len());
    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        result.push(
            chunk.iter().map(|&digit|
                f(digit)).collect()
            );
    };
    // flatten result vector
    for subvec in result.iter() {
        for elem in subvec.iter() {
            flat.push(elem.to_owned());
        }
    }
    println!("Flattened result: {:?}", flat);
}

The threaded computation should be taking place between for chunk… and // flatten …, but I can't find many simple examples of spawning x threads, assigning chunks sequentially, and returning the newly-computed vector out of the thread and into a container so it can be flattened. Do I have to wrap orig.chunks() in an Arc, and manually grab each chunk in a loop? Do I have to pass f into each thread? Will I have to use a B-Tree to ensure that input and output order match? Can I just use simple_parallel?

like image 452
urschrei Avatar asked Nov 01 '22 01:11

urschrei


1 Answers

Well, this is an ideal application for the unstable thread::scoped():

#![feature(scoped)]
use std::thread::{self, JoinGuard};

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res
}

fn main() {
    // choose an odd number of elements
    let orig: Vec<i32> = (1..14).collect();

    let mut guards: Vec<JoinGuard<Vec<i32>>> = vec!();

    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        let g = thread::scoped(move || chunk.iter().cloned().map(f).collect());
        guards.push(g);
    };

    // collect the results
    let mut result: Vec<i32> = Vec::with_capacity(orig.len());
    for g in guards {
        result.extend(g.join().into_iter());
    }

    println!("Flattened result: {:?}", result);
}

It is unstable and won't likely be stabilized in this form because it has an inherent flaw (you can find more here). As far as I can see, simple_parallel is just an extension of this approach - it hides the fiddling with JoinGuards and also can be used in stable Rust (probably with some unsafety, I believe). It is not recommended for the general use, however, as its docs suggest.

Of course, you can use thread::spawn(), but then you will need to clone each chunk so it could be moved into each thread:

use std::thread::{self, JoinHandle};

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res
}

fn main() {
    // choose an odd number of elements
    let orig: Vec<i32> = (1..14).collect();

    let mut guards: Vec<JoinHandle<Vec<i32>>> = vec!();

    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        let chunk = chunk.to_owned();
        let g = thread::spawn(move || chunk.into_iter().map(f).collect());
        guards.push(g);
    };

    // collect the results
    let mut result: Vec<i32> = Vec::with_capacity(orig.len());
    for g in guards {
        result.extend(g.join().unwrap().into_iter());
    }

    println!("Flattened result: {:?}", result);
}
like image 56
Vladimir Matveev Avatar answered Nov 27 '22 05:11

Vladimir Matveev