Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can multiple threads share an iterator?

I've been working on a function that will copy a bunch of files from a source to a destination using Rust and threads. I'm getting some trouble making the threads share the iterator. I am not still used to the borrowing system:

extern crate libc;
extern crate num_cpus;

use libc::{c_char, size_t};
use std::thread;
use std::fs::copy;

fn python_str_array_2_str_vec<T, U, V>(_: T, _: U) -> V {
    unimplemented!()
}

#[no_mangle]
pub extern "C" fn copyFiles(
    sources: *const *const c_char,
    destinies: *const *const c_char,
    array_len: size_t,
) {
    let src: Vec<&str> = python_str_array_2_str_vec(sources, array_len);
    let dst: Vec<&str> = python_str_array_2_str_vec(destinies, array_len);
    let mut iter = src.iter().zip(dst);
    let num_threads = num_cpus::get();
    let threads = (0..num_threads).map(|_| {
        thread::spawn(|| while let Some((s, d)) = iter.next() {
            copy(s, d);
        })
    });
    for t in threads {
        t.join();
    }
}

fn main() {}

I'm getting this compilation error that I have not been able to solve:

error[E0597]: `src` does not live long enough
  --> src/main.rs:20:20
   |
20 |     let mut iter = src.iter().zip(dst);
   |                    ^^^ does not live long enough
...
30 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

error[E0373]: closure may outlive the current function, but it borrows `**iter`, which is owned by the current function
  --> src/main.rs:23:23
   |
23 |         thread::spawn(|| while let Some((s, d)) = iter.next() {
   |                       ^^                          ---- `**iter` is borrowed here
   |                       |
   |                       may outlive borrowed value `**iter`
   |
help: to force the closure to take ownership of `**iter` (and any other referenced variables), use the `move` keyword, as shown:
   |         thread::spawn(move || while let Some((s, d)) = iter.next() {

I've seen the following questions already:

Value does not live long enough when using multiple threads I'm not using chunks, I would like to try to share an iterator through the threads although creating chunks to pass them to the threads will be the classic solution.

Unable to send a &str between threads because it does not live long enough I've seen some of the answers to use channels to communicate with the threads, but I'm not quite sure about using them. There should be an easier way of sharing just one object through threads.

Why doesn't a local variable live long enough for thread::scoped This got my attention, scoped is supposed to fix my error, but since it is in the unstable channel I would like to see if there is another way of doing it just using spawn.

Can someone explain how should I fix the lifetimes so the iterator can be accessed from the threads?

like image 645
Netwave Avatar asked Dec 24 '22 15:12

Netwave


1 Answers

Here's a minimal, reproducible example of your problem:

use std::thread;

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];
    let mut iter = src.iter().zip(dst);
    thread::spawn(|| {
        while let Some((s, d)) = iter.next() {
            println!("{} -> {}", s, d);
        }
    });
}

There are multiple related problems:

  1. The iterator lives on the stack and the thread's closure takes a reference to it.
  2. The closure takes a mutable reference to the iterator.
  3. The iterator itself has a reference to a Vec that lives on the stack.
  4. The Vec itself has references to string slices that likely live on the stack but are not guaranteed to live longer than the thread either way.

Said another way, the Rust compiler has stopped you from executing four separate pieces of memory unsafety.

A main thing to recognize is that any thread you spawn might outlive the place where you spawned it. Even if you call join right away, the compiler cannot statically verify that will happen, so it has to take the conservative path. This is the point of scoped threads — they guarantee the thread exits before the stack frame they were started in.

Additionally, you are attempting to use a mutable reference in multiple concurrent threads. There's zero guarantee that the iterator (or any of the iterators it was built on) can be safely called in parallel. It's entirely possible that two threads call next at exactly the same time. The two pieces of code run in parallel and write to the same memory address. One thread writes half of the data and the other thread writes the other half, and now your program crashes at some arbitrary point in the future.

Using a tool like crossbeam, your code would look something like:

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one"];
    let dst = vec!["two"];

    let mut iter = src.iter().zip(dst);
    while let Some((s, d)) = iter.next() {
        crossbeam::scope(|scope| {
            scope.spawn(|_| {
                println!("{} -> {}", s, d);
            });
        })
        .unwrap();
    }
}

As mentioned, this will only spawn one thread at a time, waiting for it to finish. An alternative to get more parallelism (the usual point of this exercise) is to interchange the calls to next and spawn. This requires transferring ownership of s and d to the thread via the move keyword:

use crossbeam; // 0.7.3

fn main() {
    let src = vec!["one", "alpha"];
    let dst = vec!["two", "beta"];

    let mut iter = src.iter().zip(dst);
    crossbeam::scope(|scope| {
        while let Some((s, d)) = iter.next() {
            scope.spawn(move |_| {
                println!("{} -> {}", s, d);
            });
        }
    })
    .unwrap();
}

If you add a sleep call inside the spawn, you can see the threads run in parallel.

I'd have written it using a for loop, however:

let iter = src.iter().zip(dst);
crossbeam::scope(|scope| {
    for (s, d) in iter {
        scope.spawn(move |_| {
            println!("{} -> {}", s, d);
        });
    }
}).unwrap();

In the end, the iterator is exercised on the current thread, and each value returned from the iterator is then handed off to a new thread. The new threads are guaranteed to exit before the captured references.

You may be interested in Rayon, a crate that allows easy parallelization of certain types of iterators.

See also:

  • How can I pass a reference to a stack variable to a thread?
  • Lifetime troubles sharing references between threads
  • How do I use static lifetimes with threads?
  • Thread references require static lifetime?
  • Lifetime woes when using threads
  • Cannot call a function in a spawned thread because it "does not fulfill the required lifetime"
like image 103
Shepmaster Avatar answered Mar 19 '23 09:03

Shepmaster