Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

For comprehension: how to run Futures sequentially

Given the following methods...

def doSomething1: Future[Int] = { ... }
def doSomething2: Future[Int] = { ... }
def doSomething3: Future[Int] = { ... }

... and the following for-comprehension:

for {
  x <- doSomething1
  y <- doSomething2
  z <- doSomething3
} yield x + y + z

The three methods run in parallel, but in my case doSomething2 MUST run after doSomething1 has finished. How do I run the three methods in sequence?

EDIT

As suggested by Philosophus42, here below is a possible implementation of doSomething1:

def doSomething1: Future[Int] = {
  // query the database for customers younger than 40;
  // `find` returns a `Future` containing the number of matches
  customerService.find(Json.obj("age" -> Json.obj("$lt" -> 40)))
}

... so the Future is created by an internal call to another method.

EDIT 2

Perhaps I simplified the use case too much... and I'm sorry. Let's try again and go closer to the real use-case. Here are the three methods:

for {
  // get all the transactions generated by the exchange service
  transactions <- exchange.orderTransactions(orderId)

  //for each transaction create a log
  logs <- Future.sequence(tansactions.map { transaction =>
    for {
      // update trading order status
      _ <- orderService.findAndUpdate(transaction.orderId, "Executed")

      // create new log
      log <- logService.insert(Log(
        transactionId => transaction.id,
        orderId => transaction.orderId,
        ...
      ))
    } yield log
  })
} yield logs

What I'm trying to do is to create a log for each transaction associated with an order. logService.insert gets invoked many times even if transactions just contains one entry.

like image 848
j3d Avatar asked Sep 03 '15 11:09

j3d


People also ask

What is future sequence?

sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.

How to combine Futures in Scala?

If you want to create multiple Scala Futures and merge their results together to get a result in a for comprehension, the correct approach is to (a) first create the futures, (b) merge their results in a for comprehension, then (c) extract the result using onComplete or a similar technique.

How Scala future works?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

Is Scala future blocking?

By default, futures and promises are non-blocking, making use of callbacks instead of typical blocking operations. To simplify the use of callbacks both syntactically and conceptually, Scala provides combinators such as flatMap , foreach , and filter used to compose futures in a non-blocking way.


1 Answers

Comment on your post

First, how does the code inside doSomethingX look like? Even more irrated, that with your given code, the futures run parallel.

Answer

In order to make the Future execution sequential, just use

for {
  v1 <- Future { ..block1... } 
  v2 <- Future { ..block2... } 
} yield combine(v1, v2)

The reason this works, is that the statement Future { ..body.. } starts asynchronous computation, at that point in time the statement is evaluated.

With the above for-comprehension desugared

Future { ..block1.. }
  .flatMap( v1 => 
     Future { ..block>.. }
       .map( v2 => combine(v1,v2) )
  )

it is obvious, that

  • if Future{ ...block1... } has it's result available,
  • the flatMap method is triggered, which
  • then triggers execution of Future { ...block2... }.

Thus Future { ...block2... } is executed after Future { ...block1... }

Additional information

A Future

Future { 
  <block> 
} 

immediately triggers execution of contained block via the ExecutionContext

Snippet 1:

val f1 = Future { <body> }
val f2 = Future { <otherbody> }

The two computations are running parallel (in case your ExecutionContext is setup this way), as the two values are evaluated immediately.

Snippet 2:

The construct

def f1 = Future { ..... }

will start execution of the future, once f1 is called

Edit:

j3d, I'm still confused, why your code does not work as expected, if your statement is correct, that the Future is created within the computeSomethingX methods.

Here is a code snippet that proves, that computeSomething2 is executed after computeSomething1

import scala.concurrent.{Await, Future} import scala.concurrent.duration._

object Playground {

  import scala.concurrent.ExecutionContext.Implicits.global

  def computeSomething1 : Future[Int] = {
    Future {
      for (i <- 1 to 10) {
        println("computeSomething1")
        Thread.sleep(500)
      }
      10
    }
  }

  def computeSomething2 : Future[String] = {
    Future {
      for(i <- 1 to 10) {
        println("computeSomething2")
        Thread.sleep(800)
      }
      "hello"
    }
  }

  def main(args: Array[String]) : Unit = {

    val resultFuture: Future[String] = for {
      v1 <- computeSomething1
      v2 <- computeSomething2
    } yield v2 + v1.toString

    // evil "wait" for result

    val result = Await.result(resultFuture, Duration.Inf)

    println( s"Result: ${result}")
  }
}

with output

computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
Result: hello10

Edit 2

If you want them to be executed in parallel, create the futures beforehand (here f1 and f2)

def main(args: Array[String]) : Unit = {
  val f1 = computeSomething1
  val f2 = computeSomething2

  val resultFuture: Future[String] = for {
    v1 <- f1
    v2 <- f2
  } yield v2 + v1.toString

  // evil "wait" for result

  val result = Await.result(resultFuture, Duration.Inf)

  println( s"Result: ${result}")
}
like image 198
Martin Senne Avatar answered Sep 23 '22 11:09

Martin Senne