I have a service (let's call it Service A) which uses Akka Server HTTP to handle incoming requests. Also I have 3rd party application (Service B) which provides several web services. The purpose of service A is to transform client requests, call one or multiple web services of service B, merge/transform results and serve it back to a client.
I am using Actors for some parts, and just Future for other. To make a call to Service B, I use Akka HTTP client.
Http.get(actorSystem).singleRequest(HttpRequest.create()
.withUri("http://127.0.0.1:8082/test"), materializer)
.onComplete(...)
The issue is, a new flow is created per each Service A request, and if there are multiple concurrent connections, it results in akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error
I already asked this question and got a suggestion to use a single Flow How to properly call Akka HTTP client for multiple (10k - 100k) requests?
While it works for a batch of requests coming from a single place, I don't know how to use a single Flow from all my concurrent request handlers.
What is the correct "Akka-way" to do it?
I think you could use Source.queue
to buffer your requests. The code below assume that you need to get the answer from 3rd party service, so having a Future[HttpResponse]
is very welcomed. This way you could also provide an overflow strategy to prevent resource starvation.
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
Await.ready(response, 3 seconds)
(code copied from my blog post)
Here is Java version of the accepted answer
final Flow<
Pair<HttpRequest, Promise<HttpResponse>>,
Pair<Try<HttpResponse>, Promise<HttpResponse>>,
NotUsed> flow =
Http.get(actorSystem).superPool(materializer);
final SourceQueue<Pair<HttpRequest, Promise<HttpResponse>>> queue = Source.<Pair<HttpRequest, Promise<HttpResponse>>>
queue(BUFFER_SIZE, OverflowStrategy.dropNew())
.via(flow)
.toMat(Sink.foreach(p -> p.second().complete(p.first())), Keep.left())
.run(materializer);
...
public CompletionStage<HttpResponse> request(HttpRequest request) {
log.debug("Making request {}", request);
Promise<HttpResponse> promise = Futures.promise();
return queue.offer(Pair.create(request, promise))
.thenCompose(buffered -> {
if (buffered instanceof QueueOfferResult.Enqueued$) {
return FutureConverters.toJava(promise.future())
.thenApply(resp -> {
if (log.isDebugEnabled()) {
log.debug("Got response {} {}", resp.status(), resp.getHeaders());
}
return resp;
});
} else {
log.error("Could not buffer request {}", request);
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
}
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With