Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I parallelize this code?

I originally wrote this in ruby and then found out that MRI doesn't support parallel execution at all. So I rewrote it in rust but am having trouble with the ownership of various pieces.

extern crate rand;
extern crate csv;
extern crate num_cpus;

use std::fs::File;
use csv::Reader;
use rand::{thread_rng, sample};
use std::thread;
use std::str::from_utf8;
use std::io::{self, Write};
use csv::index::{Indexed, create_index};

fn met_n_in_common(n:usize,csv:&mut Reader<File>)->usize{
    csv.byte_records().map(|r| if(from_utf8(r.unwrap().get(n).unwrap()).unwrap() == "TRUE"){1}else{0}).fold(0usize, |sum, i| sum + i)
}

fn mets_in_common(csv:&mut Reader<File>,current_set_length:usize)->usize {
    (0..csv.headers().unwrap().len()).map(|i| if(i == 0){0}else{met_n_in_common(i,csv)} ).filter(|&e| e==current_set_length ).count()
}

fn main() {

    let csv_s = || csv::Reader::from_file("/Users/camdennarzt/Documents/All 7000 series-Table 1-1-1-3.csv").unwrap();
    let mut csv = csv_s();
    let mut index_data = io::Cursor::new(Vec::new());
    create_index(csv_s(), index_data.by_ref()).unwrap();
    let mut index = Indexed::open(csv_s(), index_data).unwrap();

    let mut tried_indices = Vec::new();
    let mut threads : Vec<_> = (0..num_cpus::get()).map(|i|{
        thread::spawn(move || {
            let mut best_set : Vec<Vec<String>> = Vec::new();
            let mut best_count = 0;
            let mut rng = thread_rng();
            let mut indices = Vec::new();
            let limit = 2usize.pow(10)/num_cpus::get();
            for _ in (0..limit) {
                while {
                    let count = *sample(&mut rng, 13..83, 1).first().unwrap();
                    indices = sample(&mut rng, 1..83, count);
                    tried_indices.contains(&indices)
                }{}
                tried_indices.push(indices.to_owned());

                let current_set:Vec<_> = indices.iter().map(|&i|{
                    index.seek(i).unwrap();
                    index.records().next().unwrap().unwrap()
                }).collect();
                let current_count = mets_in_common(&mut csv,current_set.len());
                if (current_count > best_count){
                    best_count = current_count;
                    best_set = current_set;
                }
            }
            (best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
        })
    }).collect();
}

Specifically when I compile this (rust 1.2 stable) I get:

-*- mode: compilation; default-directory: "~/Developer/Rust/optimal_subset_finder/src/" -*-
Compilation started at Wed Aug 19 14:55:10

cargo build
   Compiling optimal_subset_finder v0.1.0 (file:///Users/camdennarzt/Developer/Rust/optimal_subset_finder)
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31         thread::spawn(move || {
main.rs:32             let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33             let mut best_count = 0;
main.rs:34             let mut rng = thread_rng();
main.rs:35             let mut indices = Vec::new();
main.rs:36             let limit = 2usize.pow(10)/num_cpus::get();
           ...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31         thread::spawn(move || {
main.rs:32             let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33             let mut best_count = 0;
main.rs:34             let mut rng = thread_rng();
main.rs:35             let mut indices = Vec::new();
main.rs:36             let limit = 2usize.pow(10)/num_cpus::get();
           ...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:31:23: 56:10 error: cannot move out of captured outer variable in an `FnMut` closure
main.rs:31         thread::spawn(move || {
main.rs:32             let mut best_set : Vec<Vec<String>> = Vec::new();
main.rs:33             let mut best_count = 0;
main.rs:34             let mut rng = thread_rng();
main.rs:35             let mut indices = Vec::new();
main.rs:36             let limit = 2usize.pow(10)/num_cpus::get();
           ...
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
main.rs:55:49: 55:68 error: cannot move out of borrowed content
main.rs:55             (best_count,best_set.iter().map(|r| *r.first().unwrap()).collect::<Vec<String>>())
                                                           ^~~~~~~~~~~~~~~~~~~
note: in expansion of closure expansion
main.rs:55:45: 55:68 note: expansion site
note: in expansion of closure expansion
main.rs:31:23: 56:10 note: expansion site
note: in expansion of closure expansion
main.rs:30:57: 57:6 note: expansion site
error: aborting due to 4 previous errors
Could not compile `optimal_subset_finder`.

To learn more, run the command again with --verbose.

Compilation exited abnormally with code 101 at Wed Aug 19 14:55:10

If I comment out the threading completely then it compiles and runs. But I have not been able to parse the docs on getting the threading working. And the errors aren't particularly helpful when it comes to knowing what to do to fix this either.

like image 547
Camden Narzt Avatar asked Dec 31 '25 23:12

Camden Narzt


1 Answers

There are several problems with your code.

First, the actual error you show is caused by the fact that you are trying to obtain a String out of &Vec<String>. This is not possible without cloning because it would require moving the String out of &Vec<String>, but you can't move out of a referenced data. Since your best_set is used inside the inner thread and is dropped just right after you return the data from it, you can safely consume it using into_iter():

best_set.into_iter()
    .flat_map(|r| r.take(1))
    .collect::<Vec<String>>()

This is the least of the problems, however. You have a lot of bad errors in concurrency part.

First, you are trying to use index in multiple threads directly. This can't be done, because index resides in the parent thread stack. The parent thread can terminate before its child threads (and this is indeed what would happen in your program if it compiled), and index would be destroyed in this case, so child threads would access garbage data. In order to fix this you need to use Arc with some kind of synchronization, like Mutex. Without the mutex you would write-access the same data from multiple threads without synchronization, a perfect example of a data race which would lead to undefined behavior.

let index = Arc::new(Mutex::new(Indexed::open(csv_s(), index_data).unwrap()));

...
let index = index.clone();
thread::spawn(move || {
    ...

    let current_set:Vec<_> = {
        let index = index.lock();
        indices.iter().map(|&i| {
            index.seek(i).unwrap();
            index.records().next().unwrap().unwrap()
        }).collect()
    };
    ...
});

Same thing you will need to do with tried_indices - you push the data from multiple threads into the same vector, so you need some kind of synchronization to do it safely. You should be careful with the scope in order not to lock it longer than necessary - remember, the mutex is released when the guard returned by lock() method goes out of scope.

And the last problem I can see, the most serious one, is that you use the same csv from all of the spawned threads. While this is not only data-racey because of mutable access, it is also wrong because the reader is by definition an exhaustible source of data. If you read it from multiple threads, even leaving concurrency issues aside, you would get absolutely random data from different parts of whatever it reads from. Therefore, even putting the reader into a mutex won't solve the problem.

The most easy solution, I think, would be to create a separate reader for each thread. Fortunately, you already have a function to create readers, so just use it before spawning the thread:

let mut csv = csv_s();
thread::spawn(move || {
    ...
});

Finally, it appears that you're not using the computation results returned from threads, but you probably know that already.

like image 79
Vladimir Matveev Avatar answered Jan 03 '26 12:01

Vladimir Matveev



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!