I have a little confusion with fan out strategies in Akka streams
, I read that
Broadcast
– (1 input, N outputs) given an input element emits to each output, while Balance
– (1 input, N outputs) given an input element emits to one of its output ports.
Can you explain me:
From the documentation... broadcast emits (sends) the element to every consumer. balance only emits to the first available consumer.
broadcast
Emit each incoming element each of n outputs.
balance
Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.
EDIT from comments:
From your gist, you should make two averageCarrierDelay functions, one for each Z
and F
. Then you can see all the elements sent to each.
val averageCarrierDelayZ =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"Z Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
val averageCarrierDelayF =
Flow[FlightDelayRecord]
.groupBy(30, _.uniqueCarrier)
.fold(("", 0, 0)){
(x: (String, Int, Int), y:FlightDelayRecord) => {
println(s"F Received Element: ${y}")
val count = x._2 + 1
val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
(y.uniqueCarrier, count, totalMins)
}
}.mergeSubstreams
Edit 2: To check things in the future I'd recommend a generic logger for stream stages so you can see what is going on.
def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }
Doing this allows you to do something like:
D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z
This way you can check areas of your graph for strange behavior that you may or may not be expecting.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With