Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Nested Futures

I have a couple of futures. campaignFuture returns a List[BigInt] and I want to be able to call the second future profileFuture for each of the values in the list returned from the first one. The second future can only be called when the first one is complete. How do I achieve this in Scala?

campaignFuture(1923).flatMap?? (May be?)

def campaignFuture(advertiserId: Int): Future[List[BigInt]] = Future {
  val campaignHttpResponse = getCampaigns(advertiserId.intValue())
  parseProfileIds(campaignHttpResponse.entity.asString)
}

def profileFuture(profileId: Int): Future[List[String]] = Future {
  val profileHttpResponse = getProfiles(profileId.intValue())
  parseSegmentNames(profileHttpResponse.entity.asString)
}    
like image 996
Anand Avatar asked Nov 12 '14 06:11

Anand


1 Answers

A for comprehension is here not applicable because we have a mix of List's and Future's. So, your friends are map and flatMap:

To react on Future result

  import scala.concurrent.{Future, Promise, Await}
  import scala.concurrent.duration.Duration
  import scala.concurrent.ExecutionContext.Implicits.global

  def campaignFuture(advertiserId: Int): Future[List[BigInt]] = Future {
    List(1, 2, 3)
  }
  def profileFuture(profileId: Int): Future[List[String]] = {
    // delayed Future
    val p = Promise[List[String]]
    Future {
      val delay: Int = (math.random * 5).toInt
      Thread.sleep(delay * 1000)
      p.success(List(s"profile-for:$profileId", s"delayed:$delay sec"))
    }
    p.future
  }



  // Future[List[Future[List[String]]]
  val listOfProfileFuturesFuture = campaignFuture(1).map { campaign =>
    campaign.map(id => profileFuture(id.toInt))
  }

  // React on Futures which are done
  listOfProfileFuturesFuture foreach { campaignFutureRes =>
    campaignFutureRes.foreach { profileFutureRes =>
      profileFutureRes.foreach(profileListEntry => println(s"${new Date} done: $profileListEntry"))
    }
  }


  // !!ONLY FOR TESTING PURPOSE - THIS CODE BLOCKS AND EXITS THE VM WHEN THE FUTURES ARE DONE!!
  println(s"${new Date} waiting for futures")
  listOfProfileFuturesFuture.foreach{listOfFut =>
    Await.ready(Future.sequence(listOfFut), Duration.Inf)
    println(s"${new Date} all futures done")
    System.exit(0)
  }
  scala.io.StdIn.readLine()

To get the result of all Futures at once

  import scala.concurrent.{Future, Await}
  import scala.concurrent.duration.Duration
  import scala.concurrent.ExecutionContext.Implicits.global

  def campaignFuture(advertiserId: Int): Future[List[BigInt]] = Future {
    List(1, 2, 3)
  }
  def profileFuture(profileId: Int): Future[List[String]] = Future {
    List(s"profile-for:$profileId")
  }


  // type: Future[List[Future[List[String]]]]
  val listOfProfileFutures = campaignFuture(1).map { campaign =>
    campaign.map(id => profileFuture(id.toInt))
  }

  // type: Future[List[List[String]]]
  val listOfProfileFuture = listOfProfileFutures.flatMap(s => Future.sequence(s))


  // print the result
  //listOfProfileFuture.foreach(println)
  //scala.io.StdIn.readLine()

  // wait for the result (THIS BLOCKS INFINITY!)
  Await.result(listOfProfileFuture, Duration.Inf)

  • we use Future.sequence to convert a List[Future[T]] to Future[List[T]].
  • flatMap to get a Future[T] from Future[Future[T]]
  • if you need wait for the result (BLOCKING!) use Await to wait for the result
like image 77
j-keck Avatar answered Sep 28 '22 18:09

j-keck