Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka-http queries do not run in parallel

I am very new to akka-http and have troubles to run queries on the same route in parallel.

I have a route that may return the result very quickly (if cached) or not (heavy CPU multithreaded computations). I would like to run these queries in parallel, in case a short one arrives after a long one with heavy computation, I do not want the second call to wait for the first to finish.

However it seems that these queries do not run in parallel if they are on the same route (run in parallel if on different routes)

I can reproduice it in a basic project:

Calling the server 3 time in parallel (with 3 Chrome's tab on http://localhost:8080/test) causes the responses to arrive respectively at 3.0s, 6.0-s and 9.0-s. I suppose queries do not run in parallel.

Running on a 6 cores (with HT) machine on Windows 10 with jdk 8.

build.sbt

name := "akka-http-test"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-http-experimental" % "2.4.11"

*AkkaHttpTest.scala**

import java.util.concurrent.Executors

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer

import scala.concurrent.{ExecutionContext, Future}

object AkkaHttpTest extends App {

  implicit val actorSystem = ActorSystem("system") // no application.conf here
  implicit val executionContext = 
                             ExecutionContext.fromExecutor(Executors.newFixedThreadPool(6))
  implicit val actorMaterializer = ActorMaterializer()

  val route = path("test") {
    onComplete(slowFunc()) { slowFuncResult =>
      complete(slowFuncResult)
    }
  }

  def slowFunc()(implicit ec: ExecutionContext): Future[String] = Future {
    Thread.sleep(3000)
    "Waited 3s"
  }

  Http().bindAndHandle(route, "localhost", 8080)

  println("server started")
}

What am I doing wrong here ?

Thanks for your help

EDIT: Thanks to @Ramon J Romero y Vigil, I added Future Wrapping, but the problem still persists

def slowFunc()(implicit ec : ExecutionContext) : Future[String] = Future {
  Thread.sleep(3000)
  "Waited 3.0s"
}

val route = path("test") {
  onComplete(slowFunc()) { slowFuncResult =>
    complete(slowFuncResult)
  }
}

Tries with a the default Thread pool, the one defined above in the config file, and a Fixed Thread Pool (6 Threads).

It seems that the onComplete directive still waits for the future to complete and then block the Route (with same connection).

Same problem with the Flow trick

import akka.stream.scaladsl.Flow

val parallelism = 10

val reqFlow = 
  Flow[HttpRequest].filter(_.getUri().path().equalsIgnoreCase("/test"))
                   .mapAsync(parallelism)(_ => slowFunc())
                   .map(str => HttpResponse(status=StatusCodes.Ok, entity=str))

Http().bindAndHandle(reqFlow, ...)

Thanks for your help

like image 359
ogen Avatar asked Feb 06 '23 17:02

ogen


1 Answers

Each IncomingConnection is handled by the same Route, therefore when you "call the server 3 times in parallel" you are likely using the same Connection and therefore the same Route.

The Route is handling all 3 incoming HttpRequest values in an akka-stream fashion, i.e. the Route is composed of multiple stages but each stage can only processes 1 element at any given time. In your example the "complete" stage of the stream will call Thread.sleep for each incoming Request and process each Request one-at-a-time.

To get multiple concurrent requests handled at the same time you should establish a unique connection for each request.

An example of the client side connection pool can be created similar to the documentation examples:

import akka.http.scaladsl.Http

val connPoolFlow = Http().newHostConnectionPool("localhost", 8080)

This can then be integrated into a stream that makes the requests:

import akka.http.scaladsl.model.Uri._
import akka.http.scaladsl.model.HttpRequest

val request = HttpRequest(uri="/test")

import akka.stream.scaladsl.Source

val reqStream = 
  Source.fromIterator(() => Iterator.continually(request).take(3))
        .via(connPoolFlow)
        .via(Flow.mapAsync(3)(identity))
        .to(Sink foreach { resp => println(resp)})
        .run()

Route Modification

If you want each HttpRequest to be processed in parallel then you can use the same Route to do so but you must spawn off Futures inside of the Route and use the onComplete directive:

def slowFunc()(implicit ec : ExecutionContext) : Future[String] = Future {
  Thread.sleep(1500)
  "Waited 1.5s"
}

val route = path("test") {
  onComplete(slowFunc()) { slowFuncResult =>
    complete(slowFuncResult)
  }
}

One thing to be aware of: if you don't specify a different ExecutionContext for your sleeping function then the same thread pool for routes will be used for your sleeping. You may exhaust the available threads this way. You should probably use a seperate ec for your sleeping...

Flow Based

One other way to handle the HttpRequests is with a stream Flow:

import akka.stream.scaladsl.Flow

val parallelism = 10

val reqFlow = 
  Flow[HttpRequest].filter(_.getUri().path().equalsIgnoreCase("/test"))
                   .mapAsync(parallelism)(_ => slowFunc())
                   .map(str => HttpResponse(status=StatusCodes.Ok, entity=str))

Http().bindAndHandle(reqFlow, ...)
like image 59
Ramón J Romero y Vigil Avatar answered Feb 14 '23 03:02

Ramón J Romero y Vigil