Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Twitter's Future.collect not working concurrently (Scala)

Coming from a node.js background, I am new to Scala and I tried using Twitter's Future.collect to perform some simple concurrent operations. But my code shows sequential behavior rather than concurrent behavior. What am I doing wrong?

Here's my code,

import com.twitter.util.Future

def waitForSeconds(seconds: Int, container:String): Future[String]  = Future[String] {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

def mainFunction:String = {
  val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
  val singleTask = waitForSeconds(1, "Single")

  allTasks onSuccess  { res =>
    println("All tasks succeeded with result " + res)
  }

  singleTask onSuccess { res =>
    println("Single task succeeded with result " + res)
  }

  "Function Complete"
}

println(mainFunction)

and this is the output I get,

All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete

The output I expect is,

All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
like image 247
Ram Avatar asked Feb 06 '23 14:02

Ram


1 Answers

Twitter's futures are more explicit about where computations are executed than the Scala standard library futures. In particular, Future.apply will capture exceptions safely (like s.c.Future), but it doesn't say anything about which thread the computation will run in. In your case the computations are running in the main thread, which is why you're seeing the results you're seeing.

This approach has several advantages over the standard library's future API. For one thing it keeps method signatures simpler, since there's not an implicit ExecutionContext that has to be passed around everywhere. More importantly it makes it easier to avoid context switches (here's a classic explanation by Brian Degenhardt). In this respect Twitter's Future is more like Scalaz's Task, and has essentially the same performance benefits (described for example in this blog post).

The downside of being more explicit about where computations run is that you have to be more explicit about where computations run. In your case you could write something like this:

import com.twitter.util.{ Future, FuturePool }

val pool = FuturePool.unboundedPool

def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
  Thread.sleep(seconds*1000)
  println(container + ": done waiting for " + seconds + " seconds")
  container + " :done waiting for " + seconds + " seconds"
}

This won't produce exactly the output you're asking for ("Function complete" will be printed first, and allTasks and singleTask aren't sequenced with respect to each other), but it will run the tasks in parallel on separate threads.

(As a footnote: the FuturePool.unboundedPool in my example above is an easy way to create a future pool for a demo, and is often just fine, but it isn't appropriate for CPU-intensive computations—see the FuturePool API docs for other ways to create a future pool that will use an ExecutorService that you provide and can manage yourself.)

like image 79
Travis Brown Avatar answered Feb 20 '23 00:02

Travis Brown