We started to implement the Source.queue[HttpRequest]
pattern mentioned in the docs: http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples
This is the (reduced) example from the documentation
val poolClientFlow = Http()
.cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew
)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}
val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
The docs state that using Source.single(request)
is an anti-pattern and should be avoid. However it doesn't clarify why and what implications come by using Source.queue
.
At this place we previously showed an example that used the
Source.single(request).via(pool).runWith(Sink.head)
. In fact, this is an anti-pattern that doesn’t perform well. Please either supply requests using a queue or in a streamed fashion as shown below.
OverflowStrategy
and matching over the QueueOfferResult
These are the questions that came up, when we started implementing this pattern in our application.
The queue implementation is not thread safe. When we use the queue in different routes / actors we have this scenario that:
A enqueued request can override the latest enqueued request, thus leading to an unresolved Future.
UPDATE
This issue as been addressed in akka/akka/issues/23081. The queue is in fact thread safe.
What happens when request are being filtered? E.g. when someone changes the implementation
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
// only successful responses
.filter(_._1.isSuccess)
// failed won't arrive here
.to(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))
Will the Future not resolve? With a single request flow this is straightforward:
Source.single(request).via(poolClientFlow).runWith(Sink.headOption)
The difference between the QueueSize
and max-open-requests
is not clear. In the end, both are buffers. Our implementation ended up using QueueSize == max-open-requests
Until now I have found two reasons for using Source.queue
over Source.single
thanks in advance, Muki
I'll answer each of your questions directly and then give a general indirect answer to the overall problem.
probably a performance gain?
You are correct that there is a Flow
materialized for each IncomingConnection
but there is still a performance gain to be had if a Connection has multiple requests coming from it.
What happens when request are being filtered?
In general streams do not have a 1:1 mapping between Source elements and Sink Elements. There can be 1:0, as in your example, or there can be 1:many if a single request somehow spawned multiple responses.
QueueSize vs max-open-request?
This ratio would depend on the speed with which elements are being offered to the queue and the speed with which http requests are being processed into responses. There is no pre-defined ideal solution.
GENERAL REDESIGN
In most cases a Source.queue
is used because some upstream function is creating input elements dynamically and then offering them to the queue, e.g.
val queue = ??? //as in the example in your question
queue.offer(httpRequest1)
queue.offer(httpRequest2)
queue.offer(httpRequest3)
This is poor design because whatever entity or function that is being used to create each input element could itself be part of the stream Source, e.g.
val allRequests = Iterable(httpRequest1, httpRequest2, httpRequest3)
//no queue necessary
val allResponses : Future[Seq[HttpResponse]] =
Source(allRequests)
.via(poolClientFlow)
.to(Sink.seq[HttpResponse])
.run()
Now there is no need to worry about the queue, max queue size, etc. Everything is bundled into a nice compact stream.
Even if the source of requests is dynamic, you can still usually use a Source. Say we are getting the request paths from the console stdin, this can still be a complete stream:
import scala.io.{Source => ioSource}
val consoleLines : () => Iterator[String] =
() => ioSource.stdin.getLines()
Source
.fromIterator(consoleLines)
.map(consoleLine => HttpRequest(GET, uri = Uri(consoleLine)))
.via(poolClientFlow)
.to(Sink.foreach[HttpResponse](println))
.run()
Now, even if each line is typed into the console at random intervals the stream can still behave reactively without a Queue.
The only instance I've every seen a queue, or Source.ActorRef
, as being absolutely necessary is when you have to create a callback function that gets passed into a third party API. This callback function will have to offer the incoming elements to the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With