Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I spawn asynchronous methods in a loop?

I have a vector of objects that have a resolve() method that uses reqwest to query an external web API. After I call the resolve() method on each object, I want to print the result of every request.

Here's my half-asynchronous code that compiles and works (but not really asynchronously):

for mut item in items {
    item.resolve().await;

    item.print_result();
}

I've tried to use tokio::join! to spawn all async calls and wait for them to finish, but I'm probably doing something wrong:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));

Here's the error I'm getting:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`

How can I call the resolve() methods for all instances at once?


This code reflects the answer - now I'm dealing with borrow checker errors that I don't really understand - should I annotate some of my variables with 'static?

let mut items = get_from_csv(path);

let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();

for task in tasks {
    task.await;
}

for item in items {
    item.print_result();
}
error[E0597]: `items` does not live long enough
  --> src\main.rs:18:25
   |
18 |       let tasks: Vec<_> = items
   |                           -^^^^
   |                           |
   |  _________________________borrowed value does not live long enough
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
31 |   }
   |   - `items` dropped here while still borrowed

error[E0505]: cannot move out of `items` because it is borrowed
  --> src\main.rs:27:17
   |
18 |       let tasks: Vec<_> = items
   |                           -----
   |                           |
   |  _________________________borrow of `items` occurs here
   | |
19 | |         .iter_mut()
   | |___________________- argument requires that `items` is borrowed for `'static`
...
27 |       for item in items {
   |                   ^^^^^ move out of `items` occurs here
like image 489
Djent Avatar asked Jan 25 '23 19:01

Djent


1 Answers

Since you want to await on the futures in parallel, you can spawn them into individual tasks that run in parallel. Since they run independently of each other and of the thread that spawned them, you can await their handles in any order.

Ideally you'd write something like this:

// spawn tasks that run in parallel
let tasks: Vec<_> = items
    .iter_mut()
    .map(|item| tokio::spawn(item.resolve()))
    .collect();
// now await them to get the resolve's to complete
for task in tasks {
    task.await.unwrap();
}
// and we're done
for item in &items {
    item.print_result();
}

But this will be rejected by the borrow checker because the future returned by item.resolve() holds a borrowed reference to item. The reference is passed to tokio::spawn() which hands it off to another thread, and the compiler cannot prove that item will outlive that thread. (The same kind of problem is encountered when you want to send reference to local data to a thread.)

There are several possible solutions to this; the one I find most elegant is to move items into the async closure passed to tokio::spawn(), and have the task hand them back to you once it's done. Basically you consume the items vector to create the tasks and immediately reconstitute it from the awaited results:

// note the use of `into_iter()` to consume `items`
let tasks: Vec<_> = items
    .into_iter()
    .map(|mut item| {
        tokio::spawn(async {
            item.resolve().await;
            item
        })
    })
    .collect();
// await the tasks for resolve's to complete and give back our items
let mut items = vec![];
for task in tasks {
    items.push(task.await.unwrap());
}
// verify that we've got the results
for item in &items {
    item.print_result();
}

Runnable code in the playground.

Note that the futures crate contains a join_all function which is similar to what you need, except it polls the individual futures without ensuring that they run in parallel. We can write a generic join_parallel that uses join_all, but also uses tokio::spawn to get parallel execution:

async fn join_parallel<T: Send + 'static>(
    futs: impl IntoIterator<Item = impl Future<Output = T> + Send + 'static>,
) -> Vec<T> {
    let tasks: Vec<_> = futs.into_iter().map(tokio::spawn).collect();
    // unwrap the Result because it is introduced by tokio::spawn()
    // and isn't something our caller can handle
    futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(Result::unwrap)
        .collect()
}

Using this function the code needed to answer the question boils down to just:

let items = join_parallel(items.into_iter().map(|mut item| async {
    item.resolve().await;
    item
})).await;
for item in &items {
    item.print_result();
}

Again, runnable code in the playground.

like image 96
user4815162342 Avatar answered Jan 29 '23 06:01

user4815162342