Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rust: Safe multi threading with recursion

I'm new to Rust. For learning purposes, I'm writing a simple program to search for files in Linux, and it uses a recursive function:

fn ffinder(base_dir:String, prmtr:&'static str, e:bool, h:bool) -> std::io::Result<()>{
    let mut handle_vec = vec![];
    let pth = std::fs::read_dir(&base_dir)?;
    for p in pth {
        let p2 = p?.path().clone();
        if p2.is_dir() {
            if !h{ //search doesn't include hidden directories
                let sstring:String = get_fname(p2.display().to_string());
                let slice:String = sstring[..1].to_string();
                if slice != ".".to_string() {
                    let handle = thread::spawn(move || {
                        ffinder(p2.display().to_string(),prmtr,e,h);
                    });
                    handle_vec.push(handle);
                }
            }
            else {//search include hidden directories
                let handle2 = thread::spawn(move || {
                    ffinder(p2.display().to_string(),prmtr,e,h);
                });
                handle_vec.push(handle2);
            } 
        }
        else {
            let handle3 = thread::spawn(move || {
                if compare(rmv_underline(get_fname(p2.display().to_string())),rmv_underline(prmtr.to_string()),e){
                    println!("File found at: {}",p2.display().to_string().blue());
                }
            });
            handle_vec.push(handle3);
        }
    }
    for h in handle_vec{
        h.join().unwrap();
    }
    Ok(())
}

I've tried to use multi threading (thread::spawn), however it can create too many threads, exploding the OS limit, which breaks the program execution.

Is there a way to multi thread with recursion, using a safe,limited (fixed) amount of threads?

like image 895
Alexandre S Avatar asked May 30 '26 06:05

Alexandre S


1 Answers

As one of the commenters mentioned, this is an absolutely perfect case for using Rayon. The blog post mentioned doesn't show how Rayon might be used in recursion, only making an allusion to crossbeam's scoped threads with a broken link. However, Rayon provides its own scoped threads implementation that solves your problem as well in that only uses as many threads as you have cores available, avoiding the error you ran into.

Here's the documentation for it:

https://docs.rs/rayon/1.0.1/rayon/fn.scope.html

Here's an example from some code I recently wrote. Basically what it does is recursively scan a folder, and each time it nests into a folder it creates a new job to scan that folder while the current thread continues. In my own tests it vastly outperforms a single threaded approach.

let source = PathBuf::from("/foo/bar/");

let (tx, rx) = mpsc::channel();
rayon::scope(|s| scan(&source, tx, s));

fn scan<'a, U: AsRef<Path>>(
    src: &U,
    tx: Sender<(Result<DirEntry, std::io::Error>, u64)>,
    scope: &Scope<'a>,
) {
    let dir = fs::read_dir(src).unwrap();
    dir.into_iter().for_each(|entry| {
        let info = entry.as_ref().unwrap();
        let path = info.path();

        if path.is_dir() {
            let tx = tx.clone();
            scope.spawn(move |s| scan(&path, tx, s)) // Recursive call here
        } else {
            // dbg!("{}", path.as_os_str().to_string_lossy());
            let size = info.metadata().unwrap().len();
            tx.send((entry, size)).unwrap();
        }
    });
}

I'm not an expert on Rayon, but I'm fairly certain the threading strategy works like this:

Rayon creates a pool of threads to match the number of logical cores you have available in your environment. The first call to the scoped function creates a job that the first available thread "steals" from the queue of jobs available. Each time we make another recursive call, it doesn't necessarily execute immediately, but it creates a new job that an idle thread can then "steal" from the queue. If all of the threads are busy, the job queue just fills up each time we make another recursive call, and each time a thread finishes its current job it steals another job from the queue.

The full code can be found here: https://github.com/1Dragoon/fcp

(Note that repo is a work in progress and the code there is currently typically broken and probably won't work at the time you're reading this.)

As an exercise to the reader, I'm more of a sys admin than an actual developer, so I also don't know if this is the ideal approach. From Rayon's documentation linked earlier:

scope() is a more flexible building block compared to join(), since a loop can be used to spawn any number of tasks without recursing

The language of that is a bit confusing. I'm not sure what they mean by "without recursing". Join seems to intend for you to already have tasks known about ahead of time and to execute them in parallel as threads become available, whereas scope seems to be more aimed at only creating jobs when you need them rather than having to know everything you need to do in advance. Either that or I'm understanding their meaning backwards.

like image 121
Dragoon Avatar answered Jun 02 '26 03:06

Dragoon