Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct use of Akka http client connection pools

I need to consume a REST service using Akka's HTTP client (v2.0.2). The logical approach is to do this via a host connection pool because we expect large numbers of simultaneous connections. The Flow for this consumes a (HttpRequest, T) and returns a (Try[HttpResponse, T). The documentation indicates that some arbitrary type T is needed to manage potential out of order responses to requests but does not point out what the caller is supposed to do with the returned T.

My first attempt is the function below using an Int as T. It is called from many places to ensure that the connections use a single pool.

val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))

def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
  val unique = Random.nextInt
  Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
    case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
    case (Failure(f), `unique`) ⇒ Future.failed(f)
    case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
  }
}

The question is how should the client use this T? Is there a cleaner more efficient solution? And finally, Is my paranoia that something may arrive out of order not actually paranoia?

like image 804
David Weber Avatar asked Jan 19 '16 09:01

David Weber


People also ask

How does HTTP connection pooling work?

When you set up connection pooling, instead of closing the client HTTP connection after use, CICS keeps the connection open and stores it in a pool in a dormant state. The dormant connection can be reused by the same application or by another application that connects to the same host and port.

Should I use HTTP connection pooling?

If the server supports persistent connections, HTTP connection pooling can provide performance benefits when multiple HTTP requests target the same host and port. With connection pooling, instead of closing the client HTTP connection after use the connection is kept open and returned for reuse.

How does Akka HTTP work?

The Akka HTTP modules implement a full server- and client-side HTTP stack on top of akka-actor and akka-stream . A typical application does not sit on top of Akka HTTP. Instead, Akka HTTP makes it easier to build integration layers based on HTTP, and therefore stays on the sidelines.


2 Answers

I was a little confused by this myself initially until I read through the docs a few times. If you are going to use single requests into the pool, no matter how many different places are sharing that same pool, the T that you are supplying (an Int in your case) doesn't matter. So if you are using Source.single all the time, that key can always be 1 if you really want.

Where it does come into play though, is if a piece of code is going to use the pool and submit multiple requests at once into the pool and wants the responses from all of those requests. The reason why is that the responses come back in the order they were received from the service that was called, and not the order in which they were supplied to the pool. Each request could take different amounts of time, so they flow downstream to the Sink in the order they were received back from the pool.

Say we had a service out there that accepted GET requests with a url in the form:

/product/123

Where the 123 part is the id of the product that you wanted to look up. If I wanted to look up products 1-10 all at once, with separate request for each, this is where the identifier becomes important so that I can correlate each HttpResponse with the product id that it is for. A simplified code example for this scenario would be as follows:

val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] = 
  Source(requests).
    via(pool).
    runFold(Map.empty[Int,HttpResponse]){
      case (m, (util.Success(resp), id)) => 
        m ++ Map(id -> resp)

      case (m, (util.Failure(ex), i)) =>
        //Log a failure here probably
          m
    }

When I get my responses in the fold, I also conveniently have the id that each is associated with so I can add them to my Map that is keyed by id. Without this functionality, I would probably have to do something like parse the body (if it was json) to try and figure out which response was which and that is not ideal, and that doesn't cover the fail case. In this solution, I know which requests failed because I still get the identifier back.

I hope that clarifies things a bit for you.

like image 107
cmbaxter Avatar answered Oct 06 '22 01:10

cmbaxter


Akka HTTP Connection pools are powerful allies when consuming HTTP based resources. If you are going to execute single requests at a time then a solution is:

def exec(req: HttpRequest): Future[HttpResponse] = {
  Source.single(req → 1)
    .via(pool)
    .runWith(Sink.head).flatMap {
      case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
      case (Failure(f), _) ⇒ Future.failed(f)
    }
}

Because you are executing a single request, there is no need to disambiguate the response. However, Akka streams are clever. You can submit multiple requests to the pool at the same time. In this instance we pass in an Iterable[HttpRequest]. The returned Iterable[HttpResponse] is reordered using a SortedMap to the same order as the original requests. You can just do a request zip response to line things up:

def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.map(r ⇒ r.values)
}

Futures of iterable futures are great if you need to unpack things your way. A simpler response can be obtained by just flattening things.

def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.flatMap(r ⇒ Future.sequence(r.values))
}

I have made this gist with all the imports and wrappers to make a client for consuming HTTP services.

A special thanks to @cmbaxter for his neat example.

like image 43
David Weber Avatar answered Oct 05 '22 23:10

David Weber