Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Alternative flows based on condition for akka stream

Have a stream with custom flows and at a certain stage I want to split the stream and have two alternative data handling which will merge again later.

E.g.

                  -> F3 -> F6 
Src -> F1 -> F2                > Merge -> Sink 
                  -> F4 -> F5

F2 should have a condition saying if data contains format A then it should go to flow F3, else go to F4.

As far as I can see, each flow can only have one port in each direction (or two if bidi) - so how can I support such a flow?

like image 425
LK__ Avatar asked Apr 17 '16 20:04

LK__


1 Answers

You can use Broadcast to split the stream, then you will able to use filter or collect on each of streams to filter required data.

val split = builder.add(Broadcast[Int](2))

Src -> F1 -> split -> filterCondA -> F3 -> F6 -> Merge -> Sink
                   -> filterCondB -> F4 -> F5 -> Merge

Also, there is Partition stage which handles the number of output ports and the map function from value to port number f: T => Int.

val portMapper(value: T): Int = value match {
  case CondA => 0
  case CondB => 1
}

val split = builder.add(Partition[T](2, portMapper))

Src -> F1 -> split -> F3 -> F6 -> Merge -> Sink
             split -> F4 -> F5 -> Merge

Maybe there is some simpler way.

like image 66
andrey.feoktistov Avatar answered Oct 25 '22 06:10

andrey.feoktistov