Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

tokio join multiple tasks in rust

Imagine that some futures are stored in a Vec whose length are runtime-determined, you are supposed to join these futures concurrently, what should you do?

Obviously, by the example in the document of tokio::join, manually specifying each length the Vec could be, like 1, 2, 3, ... and dealing with respectable case should work.

extern crate tokio;

let v = Vec::new();
v.push(future_1);

// directly or indirectly you push many futures to the vector
 
v.push(future_N);

// to join these futures concurrently one possible way is 

if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...

And I also noticed that tokio::join! take a list as parameter in the document, when I use syntax like

tokio::join!(v);

or something like

tokio::join![ v ] /  tokio::join![ v[..] ] / tokio::join![ v[..][..] ]

it just doesn't work

And here comes the question that is there any doorway to join these futures more efficient or should I miss something against what the document says?

like image 666
bruceyuan Avatar asked Aug 26 '20 02:08

bruceyuan


2 Answers

You can use futures::future::join_all to "merge" your collection of futures together into a single future, that resolves when all of the subfutures resolve.

like image 59
Colonel Thirty Two Avatar answered Sep 18 '22 15:09

Colonel Thirty Two


join_all and try_join_all, as well as more versatile FuturesOrdered and FuturesUnordered utilities from the same crate futures, are executed as a single task. This is probably fine if the constituent futures are not often concurrently ready to perform work, but if you want to make use of CPU parallelism with the multi-threaded runtime, consider spawning the individual futures as separate tasks and waiting on the join handles:

use futures::future;

// ...

let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;

You can also use the FuturesOrdered and FuturesUnordered combinators to process the outputs asynchronously in a stream:

use futures::stream::FuturesUnordered;
use futures::prelude::*;

// ...

let mut completion_stream = v.into_iter()
    .map(tokio::spawn)
    .collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
    // ...    
}

One caveat with using tasks is that they are not cancelled when the future (e.g. an async block) that has spawned the task and possibly owns the returned JoinHandle gets dropped. The JoinHandle::abort method needs to be used to explicitly cancel the task.

like image 37
mzabaluev Avatar answered Sep 21 '22 15:09

mzabaluev