Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I join all the futures in a vector without cancelling on failure like join_all does?

I've got a Vec of futures created from calling an async function. After adding all the futures to the vector, I'd like to wait on the whole set, getting either a list of results, or a callback for each one that finishes.

I could simply loop or iterate over the vector of futures and call .await on each future and that would allow me to handle errors correctly and not have futures::future::join_all cancel the others, but I'm sure there's a more idiomatic way to accomplish this task.

I also want to be able to handle the futures as they complete so if I get enough information from the first few, I can cancel the remaining incomplete futures and not wait for them and discard their results, error or not. This would not be possible if I iterated over the vector in order.

What I'm looking for is a callback (closure, etc) that lets me accumulate the results as they come in so that I can handle errors appropriately or cancel the remainder of the futures (from within the callback) if I determine I don't need the rest of them.

I can tell that's asking for a headache from the borrow checker: trying to modify a future in a Vec in a callback from the async engine.

There's a number of Stack Overflow questions and Reddit posts that explain how join_all joins on a list of futures but cancels the rest if one fails, and how async engines may spawn threads or they may not or they're bad design if they do.

like image 789
stu Avatar asked Apr 28 '20 13:04

stu


1 Answers

Use futures::select_all:

use futures::future; // 0.3.4

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;

async fn might_fail(fails: bool) -> Result<i32> {
    if fails {
        Ok(42)
    } else {
        Err("boom".into())
    }
}

async fn many() -> Result<i32> {
    let raw_futs = vec![might_fail(true), might_fail(false), might_fail(true)];
    let unpin_futs: Vec<_> = raw_futs.into_iter().map(Box::pin).collect();
    let mut futs = unpin_futs;

    let mut sum = 0;

    while !futs.is_empty() {
        match future::select_all(futs).await {
            (Ok(val), _index, remaining) => {
                sum += val;
                futs = remaining;
            }
            (Err(_e), _index, remaining) => {
                // Ignoring all errors
                futs = remaining;
            }
        }

        if sum > 42 {
            // Early exit
            return Ok(sum);
        }
    }

    Ok(sum)
}

This will poll all the futures in the collection, returning the first one that is not pending, its index, and the remaining pending or unpolled futures. You can then match on the Result and handle the success or failure case.

Call select_all inside of a loop. This gives you the ability to exit early from the function. When you exit, the futs vector is dropped and dropping a future cancels it.

like image 81
Shepmaster Avatar answered Oct 11 '22 14:10

Shepmaster