Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.
E.g.
-> F3 -> F6
Src -> F1 -> F2 > Merge -> Sink
-> F4 -> F5
F2
should have a condition saying if data contains format A
then it should go to flow F3
, else go to F4
.
As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?
You can use Broadcast
to split the stream, then you will able to use filter
or collect
on each of streams to filter required data.
val split = builder.add(Broadcast[Int](2))
Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
-> filterCondB -> F4 -> F5 -> Merge
Also, there is Partition
stage which handles the number of output ports and the map function from value to port number f: T => Int
.
val portMapper(value: T): Int = value match {
case CondA => 0
case CondB => 1
}
val split = builder.add(Partition[T](2, portMapper))
Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
split -> F4 -> F5 -> Merge
Maybe there is some simpler way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With