how to do sequential execution of Futures in scala



I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit].

However, I need to make it that each f(item) call is executed sequentially, they can not run in parallel.

for(item <- it)   f(item) 

won't work becuase this starts all the calls in parallel.

How do I do it so they follow in sequence?

How do you use the future sequence in Scala?

sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.

What is ExecutionContext Scala?

An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool. A general purpose ExecutionContext must be asynchronous in executing any Runnable that is passed into its execute -method.

What is Future sequence?

sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.

Is Scala Future a Monad?

yes, then it's a monad. @ElectricCoffee no. @PabloFernandez Scala's flatMap is Haskell's >>= , and Scala's for-comprehensions are equivalent to Haskell's do notation.

2 Answers

If you don't mind a very localised var, you can serialise the asynchronous processing (each f(item)) as follows (flatMap does the serialization):

val fSerialized = {   var fAccum = Future{()}   for(item <- it) {     println(s"Processing ${item}")     fAccum = fAccum flatMap { _ => f(item) }   }   fAccum }  fSerialized.onComplete{case resTry => println("All Done.")} 

In general, avoid Await operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)

Cool Trick 1:

You can chain together Futures via that usual suspect, flatmap - it serializes asynchronous operations. Is there anything it can't do? ;-)

def f1 = Future { // some background running logic here...} def f2 = Future { // other background running logic here...}  val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)    fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)} 

None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.

fSerialized represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1 (running asynchonously). f1 runs like any Future - when it eventually finishes, it calls it's onComplete callback block. Here's the cool bit - flatMap installs it's argument as the f1 onComplete callback block - so f2 is initiated as soon as f1 completes, with no blocking, polling or wasteful resource usage. When f2 is complete, then fSerialized is complete - so it runs the fSerialized.onComplete callback block - printing "Both Done".

Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ... 

If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:

f1.onComplete{case res1Try =>    f2   f2.onComplete{case res2Try =>     f3     f3.onComplete{case res3Try =>       f4       f4.onComplete{ ...       }     }   } } 

Not as nice.

Test to prove:

import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._  def f(item: Int): Future[Unit] = Future{   print("Waiting " + item + " seconds ...")   Console.flush   blocking{Thread.sleep((item seconds).toMillis)}   println("Done") }  val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))  fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)} 

Cool Trick 2:

for-comprehensions like this:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr 

are nothing but syntactic-sugar for this:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } } 

that's a chain of flatMaps, followed by a final map.

That means that

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!") 

is identical to

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!" 

Test to Prove (following on from previous test):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!" fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)} 

Not-So-Cool Trick 3:

Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last 

And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)

val fSerial = {for {nextItem <- itemIterable} yield                  for {nextRes <- f(nextItem)} yield "Did It"}.last 

Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right is not sociable with Future.flatMap):

import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._  def f(item: Int): Future[Unit] = Future{   print("Waiting " + item + " seconds ...")   Console.flush   blocking{Thread.sleep((item seconds).toMillis)}   println("Done") }  val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8) val empty = Future[Unit]{()} def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)  //val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))} val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}  fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)} 

But this works (var involved):

import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking import scala.concurrent.duration._  def f(item: Int): Future[Unit] = Future{   print("Waiting " + item + " seconds ...")   Console.flush   blocking{Thread.sleep((item seconds).toMillis)}   println("Done") }  val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)  var fSerial = Future{()} for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))  
def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {   items.foldLeft(Future.successful[List[U]](Nil)) {     (f, item) => f.flatMap {       x => yourfunction(item).map(_ :: x)     }   } map (_.reverse) } 

If you are running sequentially because resource constraints prevent running more than one Future at a time, it may be easier to create and use a custom ExecutionContext with only a single thread.

