Refering to the following implementation mentioned in:
http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html
val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
Is it thread safe to offer the queue http requests from multiple threads ? If it isn't, what is the best way to implement such requirement ? using a dedicated actor perhaps ?
No, it is not thread safe, as per the api doc: SourceQueue that current source is materialized to is for single thread usage only.
A dedicated actor would work fine but, if you can, using Source.actorRef
(doc link) instead of Source.queue
would be easier.
In general, the downside of Source.actorRef
is the lack of backpressure, but as you use OverflowStrategy.dropNew
, it is clear you don't expect backpressure. As such, you can get the same behaviour using Source.actorRef
.
As correctly stated by @frederic-a, SourceQueue
is not a thread safe solution.
Perhaps a fit solution would be to use a MergeHub
(see docs for more details). This effectively allows you to run your graph in two stages.
Sink
s are actually designed to be distributed, so this is perfectly safe. This solution would be safe backpressure-wise, as per MergeHub
behaviour
If the consumer cannot keep up then all of the producers are backpressured.
Code example below:
val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
// on the user threads
val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With