I am very new to akka-http and have troubles to run queries on the same route in parallel.
I have a route that may return the result very quickly (if cached) or not (heavy CPU multithreaded computations). I would like to run these queries in parallel, in case a short one arrives after a long one with heavy computation, I do not want the second call to wait for the first to finish.
However it seems that these queries do not run in parallel if they are on the same route (run in parallel if on different routes)
I can reproduice it in a basic project:
Calling the server 3 time in parallel (with 3 Chrome's tab on http://localhost:8080/test) causes the responses to arrive respectively at 3.0s, 6.0-s and 9.0-s. I suppose queries do not run in parallel.
Running on a 6 cores (with HT) machine on Windows 10 with jdk 8.
build.sbt
name := "akka-http-test"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11"
*AkkaHttpTest.scala**
import java.util.concurrent.Executors
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.{ExecutionContext, Future}
object AkkaHttpTest extends App {
implicit val actorSystem = ActorSystem("system") // no application.conf here
implicit val executionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(6))
implicit val actorMaterializer = ActorMaterializer()
val route = path("test") {
onComplete(slowFunc()) { slowFuncResult =>
complete(slowFuncResult)
}
}
def slowFunc()(implicit ec: ExecutionContext): Future[String] = Future {
Thread.sleep(3000)
"Waited 3s"
}
Http().bindAndHandle(route, "localhost", 8080)
println("server started")
}
What am I doing wrong here ?
Thanks for your help
EDIT: Thanks to @Ramon J Romero y Vigil, I added Future Wrapping, but the problem still persists
def slowFunc()(implicit ec : ExecutionContext) : Future[String] = Future {
Thread.sleep(3000)
"Waited 3.0s"
}
val route = path("test") {
onComplete(slowFunc()) { slowFuncResult =>
complete(slowFuncResult)
}
}
Tries with a the default Thread pool, the one defined above in the config file, and a Fixed Thread Pool (6 Threads).
It seems that the onComplete directive still waits for the future to complete and then block the Route (with same connection).
Same problem with the Flow trick
import akka.stream.scaladsl.Flow
val parallelism = 10
val reqFlow =
Flow[HttpRequest].filter(_.getUri().path().equalsIgnoreCase("/test"))
.mapAsync(parallelism)(_ => slowFunc())
.map(str => HttpResponse(status=StatusCodes.Ok, entity=str))
Http().bindAndHandle(reqFlow, ...)
Thanks for your help
Each IncomingConnection is handled by the same Route, therefore when you "call the server 3 times in parallel" you are likely using the same Connection and therefore the same Route.
The Route is handling all 3 incoming HttpRequest
values in an akka-stream fashion, i.e. the Route is composed of multiple stages but each stage can only processes 1 element at any given time. In your example the "complete" stage of the stream will call Thread.sleep
for each incoming Request and process each Request one-at-a-time.
To get multiple concurrent requests handled at the same time you should establish a unique connection for each request.
An example of the client side connection pool can be created similar to the documentation examples:
import akka.http.scaladsl.Http
val connPoolFlow = Http().newHostConnectionPool("localhost", 8080)
This can then be integrated into a stream that makes the requests:
import akka.http.scaladsl.model.Uri._
import akka.http.scaladsl.model.HttpRequest
val request = HttpRequest(uri="/test")
import akka.stream.scaladsl.Source
val reqStream =
Source.fromIterator(() => Iterator.continually(request).take(3))
.via(connPoolFlow)
.via(Flow.mapAsync(3)(identity))
.to(Sink foreach { resp => println(resp)})
.run()
Route Modification
If you want each HttpRequest
to be processed in parallel then you can use the same Route to do so but you must spawn off Futures inside of the Route and use the onComplete
directive:
def slowFunc()(implicit ec : ExecutionContext) : Future[String] = Future {
Thread.sleep(1500)
"Waited 1.5s"
}
val route = path("test") {
onComplete(slowFunc()) { slowFuncResult =>
complete(slowFuncResult)
}
}
One thing to be aware of: if you don't specify a different ExecutionContext for your sleeping function then the same thread pool for routes will be used for your sleeping. You may exhaust the available threads this way. You should probably use a seperate ec for your sleeping...
Flow Based
One other way to handle the HttpRequests is with a stream Flow:
import akka.stream.scaladsl.Flow
val parallelism = 10
val reqFlow =
Flow[HttpRequest].filter(_.getUri().path().equalsIgnoreCase("/test"))
.mapAsync(parallelism)(_ => slowFunc())
.map(str => HttpResponse(status=StatusCodes.Ok, entity=str))
Http().bindAndHandle(reqFlow, ...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With