Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I run parallel threads of computation on a partitioned array?

I'm trying to distribute an array across threads and have the threads sum up portions of the array in parallel. I want thread 0 to sum elements 0 1 2 and Thread 1 sum elements 3 4 5. Thread 2 to sum 6 and 7. and Thread 3 to sum 8 and 9.

I'm new to Rust but have coded with C/C++/Java before. I've literally thrown everything and the garbage sink at this program and I was hoping I could receive some guidance.

Sorry my code is sloppy but I will clean it up when it is a finished product. Please ignore all poorly named variables/inconsistent spacing/etc.

use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;

static NTHREADS: usize = 4;
static NPROCS: usize = 10;

fn main() {
    let mut a = [0; 10]; // a: [i32; 10]
    let mut endpoint = a.len() / NTHREADS;
    let mut remElements = a.len() % NTHREADS;

    for x in 0..a.len() {
        let secret_number = (rand::random::<i32>() % 100) + 1;
        a[x] = secret_number;
        println!("{}", a[x]);
    }
    let mut b = a;
    let mut x = 0;

    check_sum(&mut a);
    // serial_sum(&mut b);

    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut scale: usize = 0;

    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();
        // Each thread will send its id via the channel

        Thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            let numTougherThreads: usize = NPROCS % NTHREADS;
            let numTasksPerThread: usize = NPROCS / NTHREADS;
            let mut lsum = 0;

            if id < numTougherThreads {
                let mut q = numTasksPerThread+1;
                lsum = 0;

                while q > 0 {
                    lsum = lsum + a[scale];
                    scale+=1;
                    q = q-1;
                }
                println!("Less than numToughThreads lsum: {}", lsum);
            }
            if id >= numTougherThreads {
                let mut z = numTasksPerThread;
                lsum = 0;

                while z > 0 {
                    lsum = lsum + a[scale];
                    scale +=1;
                    z = z-1;
                }    
                println!("Greater than numToughthreads lsum: {}", lsum);
            }
            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
            thread_tx.send(lsum).unwrap();
        });
    }

    // Here, all the messages are collected
    let mut globalSum = 0;
    let mut ids = Vec::with_capacity(NTHREADS);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages      available
        ids.push(rx.recv());
    }
    println!("Global Sum: {}", globalSum);
    // Show the order in which the messages were sent

    println!("ids: {:?}", ids);
}

fn check_sum (arr: &mut [i32]) {
    let mut sum = 0;
    let mut i = 0;
    let mut size = arr.len();
    loop {
        sum += arr[i];
        i+=1;
        if i == size { break; }
    }
    println!("CheckSum is {}", sum);
}

So far I've gotten it to do this much. Can't figure out why threads 0 and 1 have the same sum as well as 2 and 3 doing the same thing:

 -5
 -49
 -32
 99
 45
 -65
 -64
 -29
 -56
 65
 CheckSum is -91
 Greater than numTough lsum: -54
 thread 2 finished
 Less than numTough lsum: -86
 thread 1 finished
 Less than numTough lsum: -86
 thread 0 finished
 Greater than numTough lsum: -54
 thread 3 finished
 Global Sum: 0
 ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]

I managed to rewrite it to work with even numbers by using the below code.

    while q > 0 {
        if id*s+scale == a.len() { break; }
        lsum = lsum + a[id*s+scale];
        scale +=1;
        q = q-1;
    }
    println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
    let mut z = numTasksPerThread;
    lsum = 0;
    let mut scale = 0;

    while z > 0 {
        if id*numTasksPerThread+scale == a.len() { break; }
        lsum = lsum + a[id*numTasksPerThread+scale];
        scale = scale + 1;
        z = z-1;
    }
like image 268
Scoshe Rosh Avatar asked Feb 19 '15 05:02

Scoshe Rosh


People also ask

Can you run threads in parallel?

On a system with more than one processor or CPU cores (as is common with modern processors), multiple processes or threads can be executed in parallel.

Can multiple threads run the same function?

A thread can execute a function in parallel with other threads. Each thread shares the same code, data, and files while they have their own stack and registers.

Can multiple threads read the same array?

The answer is no. Each array element has a region of memory reserved for it alone within the region attributed the overall array. Modifications of different elements therefore do not write to any of the same memory locations.

Is parallel programming the same as multithreading?

Parallel programming is a broad concept. It can describe many types of processes running on the same machine or on different machines. Multithreading specifically refers to the concurrent execution of more than one sequential set (thread) of instructions.


2 Answers

Welcome to Rust! :)

Yeah at first I didn't realize each thread gets it's own copy of scale

Not only that! It also gets its own copy of a!

What you are trying to do could look like the following code. I guess it's easier for you to see a complete working example since you seem to be a Rust beginner and asked for guidance. I deliberately replaced [i32; 10] with a Vec since a Vec is not implicitly Copyable. It requires an explicit clone(); we cannot copy it by accident. Please note all the larger and smaller differences. The code also got a little more functional (less mut). I commented most of the noteworthy things:

extern crate rand;

use std::sync::Arc;
use std::sync::mpsc;
use std::thread;

const NTHREADS: usize = 4; // I replaced `static` by `const`

// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
    let mut s = 0;
    for x in iter {
        s += x;
    }
    s
}

fn main() {
    // We don't want to clone the whole vector into every closure.
    // So we wrap it in an `Arc`. This allows sharing it.
    // I also got rid of `mut` here by moving the computations into
    // the initialization.
    let a: Arc<Vec<_>> =
        Arc::new(
            (0..10)
                .map(|_| {
                    (rand::random::<i32>() % 100) + 1
                })
                .collect()
        );

    let (tx, rx) = mpsc::channel(); // types will be inferred

    { // local scope, we don't need the following variables outside
        let num_tasks_per_thread = a.len() / NTHREADS; // same here
        let num_tougher_threads = a.len() % NTHREADS; // same here
        let mut offset = 0;
        for id in 0..NTHREADS {
            let chunksize =
                if id < num_tougher_threads {
                    num_tasks_per_thread + 1
                } else {
                    num_tasks_per_thread
                };
            let my_a = a.clone();  // refers to the *same* `Vec`
            let my_tx = tx.clone();
            thread::spawn(move || {
                let end = offset + chunksize;
                let partial_sum =
                    sum( (&my_a[offset..end]).iter().cloned() );
                my_tx.send(partial_sum).unwrap();
            });
            offset += chunksize;
        }
    }

    // We can close this Sender
    drop(tx);

    // Iterator magic! Yay! global_sum does not need to be mutable
    let global_sum = sum(rx.iter());
    println!("global sum via threads    : {}", global_sum);
    println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}
like image 76
sellibitze Avatar answered Sep 28 '22 08:09

sellibitze


Using a crate like crossbeam you can write this code:

use crossbeam; // 0.7.3
use rand::distributions::{Distribution, Uniform}; // 0.7.3

const NTHREADS: usize = 4;

fn random_vec(length: usize) -> Vec<i32> {
    let step = Uniform::new_inclusive(1, 100);
    let mut rng = rand::thread_rng();
    step.sample_iter(&mut rng).take(length).collect()
}

fn main() {
    let numbers = random_vec(10);
    let num_tasks_per_thread = numbers.len() / NTHREADS;

    crossbeam::scope(|scope| {
        // The `collect` is important to eagerly start the threads!
        let threads: Vec<_> = numbers
            .chunks(num_tasks_per_thread)
            .map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<i32>()))
            .collect();

        let thread_sum: i32 = threads.into_iter().map(|t| t.join().unwrap()).sum();
        let no_thread_sum: i32 = numbers.iter().cloned().sum();

        println!("global sum via threads    : {}", thread_sum);
        println!("global sum single-threaded: {}", no_thread_sum);
    })
    .unwrap();
}

Scoped threads allow you to pass in a reference that is guaranteed to outlive the thread. You can then use the return value of the thread directly, skipping channels (which are great, just not needed here!).

I followed How can I generate a random number within a range in Rust? to generate the random numbers. I also changed it to be the range [1,100], as I think that's what you meant. However, your original code is actually [-98,100], which you could also do.

Iterator::sum is used to sum up an iterator of numbers.

I threw in some rough performance numbers of the thread work, ignoring the vector construction, working on 100,000,000 numbers, using Rust 1.34 and compiling in release mode:

| threads | time (ns) | relative time (%) |
|---------+-----------+-------------------|
|       1 |  33824667 |            100.00 |
|       2 |  16246549 |             48.03 |
|       3 |  16709280 |             49.40 |
|       4 |  14263326 |             42.17 |
|       5 |  14977901 |             44.28 |
|       6 |  12974001 |             38.36 |
|       7 |  13321743 |             39.38 |
|       8 |  13370793 |             39.53 |

See also:

  • How can I pass a reference to a stack variable to a thread?
like image 28
Shepmaster Avatar answered Sep 28 '22 10:09

Shepmaster