I have a use case where I want to send a message to an external system but the flow that sends this message takes and returns a type I cant use downstream. This is a great use case for the pass through flow. I am using the implementation here. Initially I was worried that if the processingFlow uses a mapAsyncUnordered then this flow wouldn't work. Since the processing flow may reorder messages and the zip with may push out a tuple with the incorrect pair. E.g In the following example.
val testSource = Source(1 until 50)
val processingFlow: Flow[Int, Int, NotUsed] = Flow[Int].mapAsyncUnordered(10)(x => Future {
Thread.sleep(Random.nextInt(50))
x * 10
})
val passThroughFlow = PassThroughFlow(processingFlow, Keep.both)
val future = testSource.via(passThroughFlow).runWith(Sink.seq)
I would expect that the processing flow could reorder its outputs with respect its input and i would get a result such as:
[(30,1), (40,2),(10,3),(10,4), ...]
With the right ( the passed through always being in order) but the left which goes through my mapAsyncUnordered potentially being joined with an incorrect element to make a bad tuple.
Instead i actually get:
[(10,1), (20,2),(30,3),(40,4), ...]
Every time. Upon further investigation I noticed the code was running slow and in fact its not running in parallel at all despite my map async unordered. I tried introducing a buffer before and after as well as an async boundary but it always seems to run sequentially. This explains why it always ordered but I want my processing flow to have a higher throughput.
I came up with the following work around:
object PassThroughFlow {
def keepRight[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth[A, A1](processingFlow).map(_._2)
def keepBoth[A, A1](processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder => {
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[A](2))
val zip = builder.add(ZipWith[A1, A, (A1, A)]((left, right) => (left, right)))
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
FlowShape(broadcast.in, zip.out)
}
})
}
object ParallelPassThroughFlow {
def keepRight[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, A, NotUsed] =
keepBoth(parallelism, processingFlow).map(_._2)
def keepBoth[A, A1](parallelism: Int, processingFlow: Flow[A, A1, NotUsed]): Flow[A, (A1, A), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val fanOut = builder.add(Balance[A](outputPorts = parallelism))
val merger = builder.add(Merge[(A1, A)](inputPorts = parallelism, eagerComplete = false))
Range(0, parallelism).foreach { n =>
val passThrough = PassThroughFlow.keepBoth(processingFlow)
fanOut.out(n) ~> passThrough ~> merger.in(n)
}
FlowShape(fanOut.in, merger.out)
})
}
}
Two questions:
The behavior you're witnessing is a result of how broadcast
and zip
work: broadcast
emits downstream when all of its outputs signal demand; zip
waits for all of its inputs before signaling demand (and emitting downstream).
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
Consider the movement of the first element (1
) through the above graph. 1
is broadcast to both processingFlow
and zip
. zip
immediately receives one of its inputs (1
) and waits for its other input (10
), which will take a little longer to arrive. Only when zip
gets both 1
and 10
does it pull for more elements from upstream, thus triggering the movement of the second element (2
) through the stream. And so on.
As for your ParallelPassThroughFlow
, I don't know why "something doesn't feel right" to you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With