As described in akka streams documentation I tried to create a pool of workers (flows):
def balancer[In, Out](worker: Flow[In, Out, NotUsed], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
balancer ~> worker ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
then I used this function to run a flow in parallel:
def main(args: Array[String]) {
val system = ActorSystem()
implicit val mat = ActorMaterializer.create(system)
val flow = Flow[Int].map(e => {
println(e)
Thread.sleep(1000) // 1 second
e
})
Source(Range.apply(1, 10).toList)
.via(balancer(flow, 3))
.runForeach(e => {})
}
I get the expected output 1, 2, 3, 4, 5, 6, 7, 8, 9
but the numbers appear at a rate 1 per second (no parallelism). What I'm I doing wrong?
The docs in that section are outdated which will be fixed in the next release. Basically all you need is to call .async
on the flow itself. By doing this, you kind-of draw a "box" around the flow (which you can imagine as a box with one input and output port) which will prevent fusing across that box. By doing this essentially all the workers will be on dedicated actors. The rest of the graph (the broadcast and merge stages) will share another actor (they will not run on separate actors, the async box only protects the flow, things outside will still be fused).
As Endre Varga pointed out, the flow itself should be marked with .async
.
But even then, the behavior is not deterministic because the async stages have a default buffer size of 16 and the balancer might send all messages to the same worker.
As a result, balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge
would lead to the desired behavior.
For an answer given by a project member see: https://github.com/akka/akka/issues/20146#issuecomment-201381356
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With