Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka streams pass through flow limiting Parallelism / throughput of processing flow

I have a use case where I want to send a message to an external system but the flow that sends this message takes and returns a type I cant use downstream. This is a great use case for the pass through flow. I am using the implementation here. Initially I was worried that if the processingFlow uses a mapAsyncUnordered then this flow wouldn't work. Since the processing flow may reorder messages and the zip with may push out a tuple with the incorrect pair. E.g In the following example.

  val testSource = Source(1 until 50)
  val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
    Thread.sleep(Random.nextInt(50))
    x * 10
  })
  val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)

  val future = testSource.via(passThroughFlow).runWith(Sink.seq)

I would expect that the processing flow could reorder its outputs with respect its input and i would get a result such as:

[(30,1), (40,2),(10,3),(10,4), ...]

With the right ( the passed through always being in order) but the left which goes through my mapAsyncUnordered potentially being joined with an incorrect element to make a bad tuple.

Instead i actually get:

[(10,1), (20,2),(30,3),(40,4), ...]

Every time. Upon further investigation I noticed the code was running slow and in fact its not running in parallel at all despite my map async unordered. I tried introducing a buffer before and after as well as an async boundary but it always seems to run sequentially. This explains why it always ordered but I want my processing flow to have a higher throughput.

I came up with the following work around:

object PassThroughFlow {

  def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth[A, A1](processingFlow).map(_._2)

  def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
    Flow.fromGraph(GraphDSL.create() { implicit builder => {
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[A](2))
      val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))

      broadcast.out(0) ~> processingFlow ~> zip.in0
      broadcast.out(1) ~> zip.in1

      FlowShape(broadcast.in, zip.out)
    }
    })
}

object ParallelPassThroughFlow {


  def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
    keepBoth(parallelism, processingFlow).map(_._2)

  def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val fanOut = builder.add(Balance[A](outputPorts = parallelism))
      val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))

      Range(0, parallelism).foreach { n =>
        val passThrough = PassThroughFlow.keepBoth(processingFlow)
        fanOut.out(n) ~> passThrough ~> merger.in(n)
      }

      FlowShape(fanOut.in, merger.out)
    })
  }

}

Two questions:

  1. In the original implementation, Why does the zip inside the pass through flow limit the amount of parallelism of the map async unordered?
  2. Is my work around sound or could it be improved? I basically fan out my input the input to multiple stacks of the pass through flow and merge it all back together. It seems to have the properties that I want (parallel yet maintains order even if processing flow reorders) yet something doesn't feel right
like image 276
Luke De Feo Avatar asked Mar 18 '19 11:03

Luke De Feo


Video Answer


1 Answers

The behavior you're witnessing is a result of how broadcast and zip work: broadcast emits downstream when all of its outputs signal demand; zip waits for all of its inputs before signaling demand (and emitting downstream).

broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1

Consider the movement of the first element (1) through the above graph. 1 is broadcast to both processingFlow and zip. zip immediately receives one of its inputs (1) and waits for its other input (10), which will take a little longer to arrive. Only when zip gets both 1 and 10 does it pull for more elements from upstream, thus triggering the movement of the second element (2) through the stream. And so on.

As for your ParallelPassThroughFlow, I don't know why "something doesn't feel right" to you.

like image 173
Jeffrey Chung Avatar answered Nov 15 '22 04:11

Jeffrey Chung