Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pool of workers with Akka Streams

Tags:

As described in akka streams documentation I tried to create a pool of workers (flows):

def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in, merge.out)
    })
  }

then I used this function to run a flow in parallel:

def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1, 10).toList)
      .via(balancer(flow, 3))
      .runForeach(e => {})
  }

I get the expected output 1, 2, 3, 4, 5, 6, 7, 8, 9 but the numbers appear at a rate 1 per second (no parallelism). What I'm I doing wrong?

like image 767
Mihai238 Avatar asked Mar 25 '16 13:03

Mihai238


2 Answers

The docs in that section are outdated which will be fixed in the next release. Basically all you need is to call .async on the flow itself. By doing this, you kind-of draw a "box" around the flow (which you can imagine as a box with one input and output port) which will prevent fusing across that box. By doing this essentially all the workers will be on dedicated actors. The rest of the graph (the broadcast and merge stages) will share another actor (they will not run on separate actors, the async box only protects the flow, things outside will still be fused).

like image 56
Endre Varga Avatar answered Sep 30 '22 10:09

Endre Varga


As Endre Varga pointed out, the flow itself should be marked with .async.

But even then, the behavior is not deterministic because the async stages have a default buffer size of 16 and the balancer might send all messages to the same worker.

As a result, balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge would lead to the desired behavior.

For an answer given by a project member see: https://github.com/akka/akka/issues/20146#issuecomment-201381356

like image 22
Mihai238 Avatar answered Sep 30 '22 08:09

Mihai238