Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is connection pooling in akka-http using the source queue Implementation thread safe?

Refering to the following implementation mentioned in:

http://doc.akka.io/docs/akka-http/10.0.5/scala/http/client-side/host-level.html

val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](QueueSize, OverflowStrategy.dropNew)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

Is it thread safe to offer the queue http requests from multiple threads ? If it isn't, what is the best way to implement such requirement ? using a dedicated actor perhaps ?

like image 809
Niv-Nik Avatar asked Mar 25 '17 07:03

Niv-Nik


2 Answers

No, it is not thread safe, as per the api doc: SourceQueue that current source is materialized to is for single thread usage only.

A dedicated actor would work fine but, if you can, using Source.actorRef (doc link) instead of Source.queue would be easier.

In general, the downside of Source.actorRef is the lack of backpressure, but as you use OverflowStrategy.dropNew, it is clear you don't expect backpressure. As such, you can get the same behaviour using Source.actorRef.

like image 140
Frederic A. Avatar answered Nov 15 '22 01:11

Frederic A.


As correctly stated by @frederic-a, SourceQueue is not a thread safe solution.

Perhaps a fit solution would be to use a MergeHub(see docs for more details). This effectively allows you to run your graph in two stages.

  1. from your hub to your sink (this materializes to a sink)
  2. distribute the sink materialized at point 1 to your users. Sinks are actually designed to be distributed, so this is perfectly safe.

This solution would be safe backpressure-wise, as per MergeHub behaviour

If the consumer cannot keep up then all of the producers are backpressured.

Code example below:

val reqSink: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p))    => p.failure(e)
  }))(Keep.left)
  .run()

// on the user threads

val source: Source[(HttpRequest, Promise[HttpResponse]), NotUsed] = ???
source.runWith(reqSink)
like image 28
Stefano Bonetti Avatar answered Nov 15 '22 01:11

Stefano Bonetti