I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit]
.
However, I need to make it that each f(item)
call is executed sequentially, they can not run in parallel.
for(item <- it) f(item)
won't work becuase this starts all the calls in parallel.
How do I do it so they follow in sequence?
sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.
An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.
sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.
yes, then it's a monad. @ElectricCoffee no. @PabloFernandez Scala's flatMap is Haskell's >>= , and Scala's for-comprehensions are equivalent to Haskell's do notation.
If you don't mind a very localised var
, you can serialise the asynchronous processing (each f(item)
) as follows (flatMap
does the serialization):
val fSerialized = { var fAccum = Future{()} for(item <- it) { println(s"Processing ${item}") fAccum = fAccum flatMap { _ => f(item) } } fAccum } fSerialized.onComplete{case resTry => println("All Done.")}
In general, avoid Await
operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)
Cool Trick 1:
You can chain together Futures
via that usual suspect, flatmap
- it serializes asynchronous operations. Is there anything it can't do? ;-)
def f1 = Future { // some background running logic here...} def f2 = Future { // other background running logic here...} val fSerialized: Future[Unit] = f1 flatMap(res1 => f2) fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.
fSerialized
represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1
(running asynchonously). f1
runs like any Future
- when it eventually finishes, it calls it's onComplete
callback block. Here's the cool bit - flatMap
installs it's argument as the f1
onComplete callback block - so f2
is initiated as soon as f1
completes, with no blocking, polling or wasteful resource usage. When f2
is complete, then fSerialized
is complete - so it runs the fSerialized.onComplete
callback block - printing "Both Done".
Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:
f1.onComplete{case res1Try => f2 f2.onComplete{case res2Try => f3 f3.onComplete{case res3Try => f4 f4.onComplete{ ... } } } }
Not as nice.
Test to prove:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8)) fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Cool Trick 2:
for-comprehensions like this:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
are nothing but syntactic-sugar for this:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
that's a chain of flatMaps, followed by a final map.
That means that
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
is identical to
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Test to Prove (following on from previous test):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!" fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Not-So-Cool Trick 3:
Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)
val fSerial = {for {nextItem <- itemIterable} yield for {nextRes <- f(nextItem)} yield "Did It"}.last
Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right
is not sociable with Future.flatMap
):
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8) val empty = Future[Unit]{()} def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2) //val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))} val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))} fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
But this works (var involved):
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._ def f(item: Int): Future[Unit] = Future{ print("Waiting " + item + " seconds ...") Console.flush blocking{Thread.sleep((item seconds).toMillis)} println("Done") } val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8) var fSerial = Future{()} for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))
def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = { items.foldLeft(Future.successful[List[U]](Nil)) { (f, item) => f.flatMap { x => yourfunction(item).map(_ :: x) } } map (_.reverse) }
If you are running sequentially because resource constraints prevent running more than one Future
at a time, it may be easier to create and use a custom ExecutionContext
with only a single thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With