We observed a strange behavior when we tried to start a number of futures from within an actor's receive method. If we use our configured dispatchers as ExecutionContext, the futures run on the same thread and sequentially. If we use ExecutionContext.Implicits.global, the futures run in parallel as expected.
We boiled down the code to the following example (a more complete example is below):
implicit val ec = context.getDispatcher
Future{ doWork() } // <-- all running parallel
Future{ doWork() }
Future{ doWork() }
Future{ doWork() }
Future {
Future{ doWork() }
Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!!
Future{ doWork() }
Future{ doWork() }
}
A compilable example would be like this:
import akka.actor.ActorSystem
import scala.concurrent.{ExecutionContext, Future}
object WhyNotParallelExperiment extends App {
val actorSystem = ActorSystem(s"Experimental")
// Futures not started in future: running in parallel
startFutures(runInFuture = false)(actorSystem.dispatcher)
Thread.sleep(5000)
// Futures started in future: running in sequentially. Why????
startFutures(runInFuture = true)(actorSystem.dispatcher)
Thread.sleep(5000)
actorSystem.terminate()
private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = {
if (runInFuture) {
Future{
println(s"Start Futures on thread ${Thread.currentThread().getName()}")
(1 to 9).foreach(startFuture)
println(s"Started Futures on thread ${Thread.currentThread().getName()}")
}
} else {
(11 to 19).foreach(startFuture)
}
}
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
We tried with both, thread-pool-executor and fork-join-executor, with the same result.
Are we using futures in the wrong way? How should you then spawn parallel tasks?
From the description of Akka's internal BatchingExecutor
(emphasis mine):
Mixin trait for an Executor which groups multiple nested
Runnable.run()
calls into a single Runnable passed to the original Executor. This can be a useful optimization because it bypasses the original context's task queue and keeps related (nested) code on a single thread which may improve CPU affinity. However, if tasks passed to the Executor are blocking or expensive, this optimization can prevent work-stealing and make performance worse....A batching executor can create deadlocks if code does not usescala.concurrent.blocking
when it should, because tasks created within other tasks will block on the outer task completing.
If you're using a dispatcher that mixes in BatchingExecutor
--namely, a subclass of MessageDispatcher
--you could use the scala.concurrent.blocking
construct to enable parallelism with nested Futures:
Future {
Future {
blocking {
doBlockingWork()
}
}
}
In your example, you would add blocking
in the startFuture
method:
private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future {
blocking {
println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}")
Thread.sleep(500)
println(s"Future $id finished on thread ${Thread.currentThread().getName()}")
}
}
Sample output from running startFutures(true)(actorSystem.dispatcher)
with the above change:
Start Futures on thread Experimental-akka.actor.default-dispatcher-2
Started Futures on thread Experimental-akka.actor.default-dispatcher-2
Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2
Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3
Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6
Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7
Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5
Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10
Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8
Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9
Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4
Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2
Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3
Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6
Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5
Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9
Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7
Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10
Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8
Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With