I've got a Vec
of futures created from calling an async
function. After adding all the futures to the vector, I'd like to wait on the whole set, getting either a list of results, or a callback for each one that finishes.
I could simply loop or iterate over the vector of futures and call .await
on each future and that would allow me to handle errors correctly and not have futures::future::join_all
cancel the others, but I'm sure there's a more idiomatic way to accomplish this task.
I also want to be able to handle the futures as they complete so if I get enough information from the first few, I can cancel the remaining incomplete futures and not wait for them and discard their results, error or not. This would not be possible if I iterated over the vector in order.
What I'm looking for is a callback (closure, etc) that lets me accumulate the results as they come in so that I can handle errors appropriately or cancel the remainder of the futures (from within the callback) if I determine I don't need the rest of them.
I can tell that's asking for a headache from the borrow checker: trying to modify a future in a Vec
in a callback from the async engine.
There's a number of Stack Overflow questions and Reddit posts that explain how join_all
joins on a list of futures but cancels the rest if one fails, and how async engines may spawn threads or they may not or they're bad design if they do.
Use futures::select_all
:
use futures::future; // 0.3.4
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
async fn might_fail(fails: bool) -> Result<i32> {
if fails {
Ok(42)
} else {
Err("boom".into())
}
}
async fn many() -> Result<i32> {
let raw_futs = vec![might_fail(true), might_fail(false), might_fail(true)];
let unpin_futs: Vec<_> = raw_futs.into_iter().map(Box::pin).collect();
let mut futs = unpin_futs;
let mut sum = 0;
while !futs.is_empty() {
match future::select_all(futs).await {
(Ok(val), _index, remaining) => {
sum += val;
futs = remaining;
}
(Err(_e), _index, remaining) => {
// Ignoring all errors
futs = remaining;
}
}
if sum > 42 {
// Early exit
return Ok(sum);
}
}
Ok(sum)
}
This will poll all the futures in the collection, returning the first one that is not pending, its index, and the remaining pending or unpolled futures. You can then match on the Result
and handle the success or failure case.
Call select_all
inside of a loop. This gives you the ability to exit early from the function. When you exit, the futs
vector is dropped and dropping a future cancels it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With