Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Play framework futures not being parallelised by default-dispatcher [duplicate]

I'm trying to test the ExecutionContext behaviour in a play app, and found that I'm not able to achieve any degree of parallelism when I'm using the default dispatcher either by calling as.dispatcher, as.dispatchers.lookup("akka.actor.default-dispatcher") or passing the default execution context as a parameter to my Controller class:

class HomeController @Inject()(cc: ControllerComponents)(implicit ec: ExecutionContext)

I'm building on the play examples available in here. And adding/altering the following configuration:

routes

GET    /futures    controllers.HomeController.testFutures(dispatcherId: String)

common.conf

akka {
  my-dispatcher {
    executor = "fork-join-executor"
    fork-join-executor {
      # vm-cores = 4
      parallelism-min = 4

      parallelism-factor = 2.0

      # 2x vm-cores
      parallelism-max = 8
    }
  }

  actor.default-dispatcher {
    executor = "fork-join-executor"
    fork-join-executor {
      # vm-cores = 4
      parallelism-min = 4

      parallelism-factor = 2.0

      # 2x vm-cores
      parallelism-max = 8
    }
  }
}

HomeController

@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
  import HomeController._

  def testFutures(dispatcherId: String) = Action.async { implicit request =>
    implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
    Future.sequence((0 to 10).map(i => Future {
      val time = 1000 + Random.nextInt(200)
      log.info(s"Sleeping #$i for $time ms")
      Thread.sleep(time)
      log.info(s"Awakening #$i")
    })).map(_ => Ok("ok"))
  }
}

For some reason, calls to http://localhost:9000/futures?dispatcherId=akka.actor.default-dispatcher (default dispatcher) don't parallelize and produce the following output:

[info] c.HomeController - Sleeping #0 for 1044 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #1 for 1034 ms
[info] c.HomeController - Awakening #1
[info] c.HomeController - Sleeping #2 for 1031 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #3 for 1065 ms
[info] c.HomeController - Awakening #3
[info] c.HomeController - Sleeping #4 for 1082 ms
[info] c.HomeController - Awakening #4
[info] c.HomeController - Sleeping #5 for 1057 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Sleeping #6 for 1090 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #7 for 1165 ms
[info] c.HomeController - Awakening #7
[info] c.HomeController - Sleeping #8 for 1173 ms
[info] c.HomeController - Awakening #8
[info] c.HomeController - Sleeping #9 for 1034 ms
[info] c.HomeController - Awakening #9
[info] c.HomeController - Sleeping #10 for 1056 ms
[info] c.HomeController - Awakening #10

But calls to this http://localhost:9000/futures?dispatcherId=akka.my-dispatcher (using another dispatcher) parallelize correclty and produce the following output.

[info] c.HomeController - Sleeping #1 for 1191 ms
[info] c.HomeController - Sleeping #0 for 1055 ms
[info] c.HomeController - Sleeping #7 for 1196 ms
[info] c.HomeController - Sleeping #4 for 1121 ms
[info] c.HomeController - Sleeping #6 for 1040 ms
[info] c.HomeController - Sleeping #2 for 1016 ms
[info] c.HomeController - Sleeping #5 for 1107 ms
[info] c.HomeController - Sleeping #3 for 1165 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #8 for 1002 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #9 for 1127 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #10 for 1016 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Awakening #4
[info] c.HomeController - Awakening #3
[info] c.HomeController - Awakening #1
[info] c.HomeController - Awakening #7
[info] c.HomeController - Awakening #8
[info] c.HomeController - Awakening #10
[info] c.HomeController - Awakening #9

Any ideas why this could be happening?

like image 931
mmendez.semantic Avatar asked Nov 06 '22 13:11

mmendez.semantic


1 Answers

I think the behavior is given by akka.actor.default-dispatcher that is of the type BatchingExecutor and this will try optimize in the cases of operations such as map/flatmap by executing them in the same thread to avoid unnecessary schedules . In the case where we are going to block we can indicate it with a hint as scala.concurrent.blocking (Thread.sleep (time)) and in this way a mark is stored in a ThreadLocal[BlockContext] which indicates the intention to block and does not apply the optimizations but throws the operation in another thread.

if you change this line Thread.sleep(time) for this scala.concurrent.blocking(Thread.sleep(time)) you will get the desired behavior

@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
  import HomeController._

  def testFutures(dispatcherId: String) = Action.async { implicit request =>
    implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
    Future.sequence((0 to 10).map(i => Future {
      val time = 1000 + Random.nextInt(200)
      log.info(s"Sleeping #$i for $time ms")
      scala.concurrent.blocking(Thread.sleep(time))
      log.info(s"Awakening #$i")
    })).map(_ => Ok("ok"))
  }
}
[info] play.api.Play - Application started (Dev) (no global state)
Sleeping #0 for 1062 ms
Sleeping #1 for 1128 ms
Sleeping #2 for 1189 ms
Sleeping #3 for 1105 ms
Sleeping #4 for 1169 ms
Sleeping #5 for 1178 ms
Sleeping #6 for 1057 ms
Sleeping #7 for 1003 ms
Sleeping #8 for 1164 ms
Sleeping #9 for 1029 ms
Sleeping #10 for 1005 ms
Awakening #7
Awakening #10
Awakening #9
Awakening #6
Awakening #0
Awakening #3
Awakening #1
Awakening #8
Awakening #4
Awakening #5
Awakening #2
like image 106
Diego Parra Avatar answered Nov 15 '22 06:11

Diego Parra