Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Executing a collection of futures sequentially

Tags:

rust

future

I have a collection of futures which I want to combine into a single future that gets them executed sequentially.

I looked into the futures_ordered function. It seems to return the results sequentially but the futures get executed concurrently.

I tried to fold the futures, combining them with and_then. However, that is tricky with the type system.

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

This gives the following error:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

I'm probably approaching this wrong but I've run out of ideas.

like image 879
Jaanus Varus Avatar asked Jan 03 '18 16:01

Jaanus Varus


People also ask

What is future sequence?

sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future.

What is Scala Futures?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

Why use future in Scala?

When you want to write parallel and concurrent applications in Scala, you could still use the native Java Thread — but the Scala Future makes parallel/concurrent programming much simpler, and it's preferred.

What is a promise Scala?

Promise is an object which can be completed with a value or failed with an exception. A promise should always eventually be completed, whether for success or failure, in order to avoid unintended resource retention for any associated Futures' callbacks or transformations. Source Promise.scala. AnyRef, Any.


1 Answers

Combine iter_ok and Stream::for_each:

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each then returns a Future to be run for each item - here simply the items that were passed in.

for_each then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return () on success.

for_each itself returns a Future that will either fail (like described above) or return () on completion.

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)
like image 145
Stefan Avatar answered Oct 03 '22 23:10

Stefan