Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use a Condvar to limit multithreading?

I'm trying to use a Condvar to limit the number of threads that are active at any given time. I'm having a hard time finding good examples on how to use Condvar. So far I have:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
    let mut i = 0;
    while i < 100 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let &(ref num, ref cvar) = &*thread_count;
            {
                let mut start = num.lock().unwrap();
                if *start >= 20 {
                    cvar.wait(start);
                }
                *start += 1;
            }
            println!("hello");
            cvar.notify_one();
        });
        i += 1;
    }
}

The compiler error given is:

error[E0382]: use of moved value: `start`
  --> src/main.rs:16:18
   |
14 |                     cvar.wait(start);
   |                               ----- value moved here
15 |                 }
16 |                 *start += 1;
   |                  ^^^^^ value used here after move
   |
   = note: move occurs because `start` has type `std::sync::MutexGuard<'_, i32>`, which does not implement the `Copy` trait

I'm entirely unsure if my use of Condvar is correct. I tried staying as close as I could to the example on the Rust API. Wwat is the proper way to implement this?

like image 480
Dumbapples Avatar asked Sep 29 '22 04:09

Dumbapples


2 Answers

Here's a version that compiles:

use std::{
    sync::{Arc, Condvar, Mutex},
    thread,
};

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
    let mut i = 0;
    while i < 100 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let (num, cvar) = &*thread_count;

            let mut start = cvar
                .wait_while(num.lock().unwrap(), |start| *start >= 20)
                .unwrap();

            // Before Rust 1.42, use this:
            //
            // let mut start = num.lock().unwrap();
            // while *start >= 20 {
            //     start = cvar.wait(start).unwrap()
            // }

            *start += 1;

            println!("hello");
            cvar.notify_one();
        });
        i += 1;
    }
}

The important part can be seen from the signature of Condvar::wait_while or Condvar::wait:

pub fn wait_while<'a, T, F>(
    &self,
    guard: MutexGuard<'a, T>,
    condition: F
) -> LockResult<MutexGuard<'a, T>>
where
    F: FnMut(&mut T) -> bool, 
pub fn wait<'a, T>(
    &self,
    guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>

This says that wait_while / wait consumes the guard, which is why you get the error you did - you no longer own start, so you can't call any methods on it!

These functions are doing a great job of reflecting how Condvars work - you give up the lock on the Mutex (represented by start) for a while, and when the function returns you get the lock again.

The fix is to give up the lock and then grab the lock guard return value from wait_while / wait. I've also switched from an if to a while, as encouraged by huon.

like image 200
Shepmaster Avatar answered Oct 06 '22 20:10

Shepmaster


For reference, the usual way to have a limited number of threads in a given scope is with a Semaphore.

Unfortunately, Semaphore was never stabilized, was deprecated in Rust 1.8 and was removed in Rust 1.9. There are crates available that add semaphores on top of other concurrency primitives.

let sema = Arc::new(Semaphore::new(20)); 

for i in 0..100 {
    let sema = sema.clone();
    thread::spawn(move || {
        let _guard = sema.acquire();
        println!("{}", i);
    })
}

This isn't quite doing the same thing: since each thread is not printing the total number of the threads inside the scope when that thread entered it.

like image 25
huon Avatar answered Oct 06 '22 19:10

huon