I'm trying to use a Condvar
to limit the number of threads that are active at any given time. I'm having a hard time finding good examples on how to use Condvar
. So far I have:
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let &(ref num, ref cvar) = &*thread_count;
{
let mut start = num.lock().unwrap();
if *start >= 20 {
cvar.wait(start);
}
*start += 1;
}
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
The compiler error given is:
error[E0382]: use of moved value: `start`
--> src/main.rs:16:18
|
14 | cvar.wait(start);
| ----- value moved here
15 | }
16 | *start += 1;
| ^^^^^ value used here after move
|
= note: move occurs because `start` has type `std::sync::MutexGuard<'_, i32>`, which does not implement the `Copy` trait
I'm entirely unsure if my use of Condvar
is correct. I tried staying as close as I could to the example on the Rust API. Wwat is the proper way to implement this?
Here's a version that compiles:
use std::{
sync::{Arc, Condvar, Mutex},
thread,
};
fn main() {
let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
let mut i = 0;
while i < 100 {
let thread_count = thread_count_arc.clone();
thread::spawn(move || {
let (num, cvar) = &*thread_count;
let mut start = cvar
.wait_while(num.lock().unwrap(), |start| *start >= 20)
.unwrap();
// Before Rust 1.42, use this:
//
// let mut start = num.lock().unwrap();
// while *start >= 20 {
// start = cvar.wait(start).unwrap()
// }
*start += 1;
println!("hello");
cvar.notify_one();
});
i += 1;
}
}
The important part can be seen from the signature of Condvar::wait_while
or Condvar::wait
:
pub fn wait_while<'a, T, F>(
&self,
guard: MutexGuard<'a, T>,
condition: F
) -> LockResult<MutexGuard<'a, T>>
where
F: FnMut(&mut T) -> bool,
pub fn wait<'a, T>(
&self,
guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>
This says that wait_while
/ wait
consumes the guard
, which is why you get the error you did - you no longer own start
, so you can't call any methods on it!
These functions are doing a great job of reflecting how Condvar
s work - you give up the lock on the Mutex
(represented by start
) for a while, and when the function returns you get the lock again.
The fix is to give up the lock and then grab the lock guard return value from wait_while
/ wait
. I've also switched from an if
to a while
, as encouraged by huon.
For reference, the usual way to have a limited number of threads in a given scope is with a Semaphore
.
Unfortunately, Semaphore
was never stabilized, was deprecated in Rust 1.8 and was removed in Rust 1.9. There are crates available that add semaphores on top of other concurrency primitives.
let sema = Arc::new(Semaphore::new(20));
for i in 0..100 {
let sema = sema.clone();
thread::spawn(move || {
let _guard = sema.acquire();
println!("{}", i);
})
}
This isn't quite doing the same thing: since each thread is not printing the total number of the threads inside the scope when that thread entered it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With