I'd like the ability to have multiple threads evaluate the same closure. The application I have in mind is parallelized numerical integration, so a situation where the function domain can be easily split into N chunks and handed to threads.
This is a simple function that evaluates the provided closure multiple times and averages the result:
use std::sync::mpsc;
use std::thread;
const THREAD_COUNT: u64 = 4;
fn average<F: Fn(f64) -> f64>(f: F) -> f64 {
let (tx, rx) = mpsc::channel();
for id in 0..THREAD_COUNT {
let thread_tx = tx.clone();
thread::spawn(move || {
thread_tx.send(f(id as f64));
});
}
let mut total = 0.0;
for id in 0..THREAD_COUNT {
total += rx.recv().unwrap();
}
total / THREAD_COUNT as f64
}
fn main() {
average(|x: f64| -> f64 { x });
}
When I compile I get this error:
error[E0277]: `F` cannot be sent between threads safely
--> src/main.rs:10:9
|
10 | thread::spawn(move || {
| ^^^^^^^^^^^^^ `F` cannot be sent between threads safely
|
= help: within `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`, the trait `std::marker::Send` is not implemented for `F`
= help: consider adding a `where F: std::marker::Send` bound
= note: required because it appears within the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]`
= note: required by `std::thread::spawn`
So I add + Send
to the bounds on F
and get a new error:
error[E0310]: the parameter type `F` may not live long enough
--> src/main.rs:10:9
|
6 | fn average<F: Fn(f64) -> f64 + Send>(f: F) -> f64 {
| -- help: consider adding an explicit lifetime bound `F: 'static`...
...
10 | thread::spawn(move || {
| ^^^^^^^^^^^^^
|
note: ...so that the type `[closure@src/main.rs:10:23: 12:10 thread_tx:std::sync::mpsc::Sender<f64>, f:F, id:u64]` will meet its required lifetime bounds
--> src/main.rs:10:9
|
10 | thread::spawn(move || {
| ^^^^^^^^^^^^^
So I add + 'static
to F
and get this:
error[E0382]: capture of moved value: `f`
--> src/main.rs:11:28
|
10 | thread::spawn(move || {
| ------- value moved (into closure) here
11 | thread_tx.send(f(id as f64));
| ^ value captured here after move
|
= note: move occurs because `f` has type `F`, which does not implement the `Copy` trait
So I add + Copy
to F
and get:
error: the trait `core::marker::Copy` is not implemented for the type `[closure@src/test.rs:115:11: 115:26]
It seems every thread wants its own copy of the closure (because of move
) but closures don't implement Copy
so no luck. It seems strange to me because if the closures are never mutating state then what's the safety issue with multiple threads accessing them?
I can get the code to work by providing a regular function instead of a closure, but this makes my code non-generic, i.e. it only works for a specific function instead of for anything that's Fn(f64) -> f64
. And for the type of integration I'm doing, the functions integrated often have certain fixed variables mixed with the variable of integration, so it would seem natural to capture the fixed variables with a closure.
Is there some way to make this kind of multithreaded function evaluation work in a generic manner? Am I just thinking about things wrong?
The ultimate problem revolves around who owns the closure. The code as written states that ownership of the closure is transferred to average
. This function then tries to give the closure to multiple threads, which fails as you have seen, as you can't give one item to multiple children.
but closures don't implement
Copy
so no luck
As of Rust 1.26.0, closures do implement Clone
and Copy
if all of the captured variables do. This means your final example code now works as-is:
fn average<F: Fn(f64) -> f64 + Send + 'static + Copy>(f: F) -> f64 { /* ... */ }
However, it's possible that your closures won't implement Copy
or Clone
.
You cannot give out a reference to the closure owned by average
because the thread created with thread::spawn
may outlive the call to average
. When average
exits, any stack-allocated variables will be destroyed. Any use of them would cause memory unsafety, which Rust aims to prevent.
One solution is to use an Arc
. This will allow multiple shared owners of a single resource in a multithreaded context. When the wrapped closure is cloned, only a new reference is created. When all references disappear, the object is freed.
use std::{
sync::{mpsc, Arc},
thread,
};
const THREAD_COUNT: u64 = 4;
fn average<F>(f: F) -> f64
where
F: Fn(f64) -> f64 + Send + Sync + 'static,
{
let (tx, rx) = mpsc::channel();
let f = Arc::new(f);
for id in 0..THREAD_COUNT {
let thread_tx = tx.clone();
let f = f.clone();
thread::spawn(move || {
thread_tx.send(f(id as f64)).unwrap();
});
}
let mut total = 0.0;
for _ in 0..THREAD_COUNT {
total += rx.recv().unwrap();
}
total / THREAD_COUNT as f64
}
fn main() {
average(|x| x);
}
A more standard solution is to use scoped threads. These threads are guaranteed to exit by a certain time, which allows you to pass references that outlive the threads to the threads.
See also:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With