TLDR: is it better to materialize a stream per request (i.e. use short-lived streams) or to use a single stream materialization across requests, when I have an outgoing http request as a part of the stream?
Details: I have a typical service that takes an HTTP request, scatters it to several 3rd party downstream services (not controlled by me) and aggregates the results before sending them back. I'm using akka-http for client implementation and spray for server (legacy, will move to akka-http over time). Schematically:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
This can be achieved either by materializing a stream per request or materializing (parts of) stream once and share it across requests.
Materializing per request incurs materialization overhead1 and it is not clear how to leverage connection pools with it. The problem is described here (many materializations can exhaust the pool). I can wrap a pool in a long-running http stream like here and wrap in a mapAsync
"upstream", but the error handling strategy is not clear to me. When a single request fails and the stream is terminated, would it take down the pool as well? More, it seems I will need to reconcile requests and responses since they are not returned in order.
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
Sharing stream across requests has a similar issue of error handling - it seems that there are failure modes that can bring down that stream with all requests in-flight. The code will be similar to host level API, but with the queue fronting the whole stream.
Which way is better in this case?
I did try to implement both solutions, but there are many design choices at every stage of implementation, so it seems easy to screw up even on a "right" path.
1Although I believe it is negligible, and it is the same way akka-http server operates.
Akka HTTP is made for building integration layers based on HTTP and as such tries to “stay on the sidelines”. Therefore you normally don't build your application “on top of” Akka HTTP, but you build your application on top of whatever makes sense and use Akka HTTP merely for the HTTP integration needs.
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.
At the sbt prompt, enter reStart . sbt builds the project, starts an Akka HTTP server, and runs the example application. The Akka HTTP server is now running, and you can test it by sending simple HTTP requests. You can restart it by entering reStart again, and stop it with reStop .
Kafka is known as an event streaming platform that resides loosely in the Message-oriented Middleware (MoM)space. Akka is known as an actor model that is a process for synchronized computation based on responsiveness, fault tolerance, and message passing.
In general it is much better to use a single connection Flow
and dispatch all of your requests through that single Flow. The primary reason is due to the fact that a new materialization may actually result in a new Connection
being formed each time (depending on your connection pool settings).
You are correct that this results in a few complications:
Ordering: By providing a random UUID
as the 2nd value in the tuple
that you are passing to the connection flow you are eliminating your ability to correlate a request to a response. That extra T
value in the tuple can be used as a "correlation id" to know which HttpResponse
you are getting from the Flow. In your particular example you could use the initial Int
from the Range
you created:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
Now each response comes with the original Int value which you can use to process the response.
Error Handling: You are incorrect in stating "a single request fails and the stream is terminated". A single request failure DOES NOT necessarily result in the stream failing. Rather, you will simply get a (Failure(exception), Int)
value from the connection flow. You now know which Int caused the failure and you have the exception from the flow.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With