Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to race a collection of Futures in Rust?

Given a collection of Futures, say a Vec<impl Future<..>>, how can I block and run all of the Futures concurrently until the first Future is ready?

The closest feature I can find is the select macro (which is also available in Tokio). Unfortunately it only works with an explicit number of Futures, instead of handling a collection of them.

There is an equivalent of this feature in Javascript, called Promise.race. Is there a way to do this in Rust?

Or perhaps there's a way to fulfill this use case using another pattern, perhaps with channels?

like image 248
modulitos Avatar asked Sep 03 '20 02:09

modulitos


People also ask

How do futures work rust?

Futures in rust allow you to define a task, like a network call or computation, to be run asynchronously. You can chain functions onto that result, transform it, handle errors, merge it with other futures, and perform many other computations on it.

What is Tokio :: Main?

The #[tokio::main] function is a macro. It transforms the async fn main() into a synchronous fn main() that initializes a runtime instance and executes the async main function. The details of the Tokio runtime will be covered later.


Video Answer


1 Answers

I figured out a solution using the select_all function from the futures library.

Here is a simple example to demonstrate how it can be used to race a collection of futures:

use futures::future::select_all;
use futures::FutureExt;
use tokio::time::{delay_for, Duration};

async fn get_async_task(task_id: &str, seconds: u64) -> &'_ str {
    println!("starting {}", task_id);
    let duration = Duration::new(seconds, 0);

    delay_for(duration).await;

    println!("{} complete!", task_id);
    task_id
}

#[tokio::main]
async fn main() {
    let futures = vec![

        // `select_all` expects the Futures iterable to implement UnPin, so we use `boxed` here to
        // allocate on the heap:
        // https://users.rust-lang.org/t/the-trait-unpin-is-not-implemented-for-genfuture-error-when-using-join-all/23612/3
        // https://docs.rs/futures/0.3.5/futures/future/trait.FutureExt.html#method.boxed

        get_async_task("task 1", 5).boxed(),
        get_async_task("task 2", 4).boxed(),
        get_async_task("task 3", 1).boxed(),
        get_async_task("task 4", 2).boxed(),
        get_async_task("task 5", 3).boxed(),
    ];

    let (item_resolved, ready_future_index, _remaining_futures) =
        select_all(futures).await;

    assert_eq!("task 3", item_resolved);
    assert_eq!(2, ready_future_index);
}

Here's a link to the code above: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=f32b2ed404624c1b0abe284914f8658d

Thanks to @Herohtar for suggesting select_all in the comments above!

like image 196
modulitos Avatar answered Oct 22 '22 22:10

modulitos