I have a collection of futures which I want to combine into a single future that gets them executed sequentially.
I looked into the futures_ordered
function. It seems to return the results sequentially but the futures get executed concurrently.
I tried to fold
the futures, combining them with and_then
. However, that is tricky with the type system.
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = tasks.into_iter().fold(
ok(()), // seed
|acc, task| acc.and_then(|_| task), // accumulator
);
playground
This gives the following error:
error[E0308]: mismatched types
--> src/main.rs:10:21
|
10 | |acc, task| acc.and_then(|_| task), // accumulator
| ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
|
= note: expected type `futures::FutureResult<_, _>`
found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`
I'm probably approaching this wrong but I've run out of ideas.
sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
When you want to write parallel and concurrent applications in Scala, you could still use the native Java Thread — but the Scala Future makes parallel/concurrent programming much simpler, and it's preferred.
Promise is an object which can be completed with a value or failed with an exception. A promise should always eventually be completed, whether for success or failure, in order to avoid unintended resource retention for any associated Futures' callbacks or transformations. Source Promise.scala. AnyRef, Any.
Combine iter_ok
and Stream::for_each
:
use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;
let tasks = vec![ok(()), ok(()), ok(())];
let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);
iter_ok
produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each
then returns a Future
to be run for each item - here simply the items that were passed in.
for_each
then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return ()
on success.
for_each
itself returns a Future
that will either fail (like described above) or return ()
on completion.
test tests::bench_variant_buffered ... bench: 22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ... bench: 8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench: 4,070 ns/iter (+/- 531)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With