Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between Balance and Broadcast fan out in Akka Streams

I have a little confusion with fan out strategies in Akka streams, I read that Broadcast – (1 input, N outputs) given an input element emits to each output, while Balance – (1 input, N outputs) given an input element emits to one of its output ports.

Can you explain me:

  1. How balance does work with multiple consumers?
  2. Meaning of phrase "emits to one of its output ports"
  3. Does port is same to downstream?
  4. Does 'Balance' stand for replication of input stream into a few output partition
  5. What does "balance is enabling graphs to be split apart and multiple instances of downstream subscribers replicated to handle the volume" mean?
like image 734
pacman Avatar asked Mar 11 '23 20:03

pacman


1 Answers

From the documentation... broadcast emits (sends) the element to every consumer. balance only emits to the first available consumer.

broadcast

Emit each incoming element each of n outputs.

balance

Fan-out the stream to several streams. Each upstream element is emitted to the first available downstream consumer.

EDIT from comments:

From your gist, you should make two averageCarrierDelay functions, one for each Z and F. Then you can see all the elements sent to each.

val averageCarrierDelayZ =
    Flow[FlightDelayRecord]
      .groupBy(30, _.uniqueCarrier)
        .fold(("", 0, 0)){
          (x: (String, Int, Int), y:FlightDelayRecord) => {
            println(s"Z Received Element: ${y}")
            val count = x._2 + 1
            val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
            (y.uniqueCarrier, count, totalMins)
          }
        }.mergeSubstreams


val averageCarrierDelayF =
    Flow[FlightDelayRecord]
      .groupBy(30, _.uniqueCarrier)
        .fold(("", 0, 0)){
          (x: (String, Int, Int), y:FlightDelayRecord) => {
            println(s"F Received Element: ${y}")
            val count = x._2 + 1
            val totalMins = x._3 + Try(y.arrDelayMins.toInt).getOrElse(0)
            (y.uniqueCarrier, count, totalMins)
          }
        }.mergeSubstreams

Edit 2: To check things in the future I'd recommend a generic logger for stream stages so you can see what is going on.

def logElement[A](msg: String) = Flow[A].map { a => println(s"${msg} ${a}"); a }

Doing this allows you to do something like:

D ~> logElement[FlightDelayRecord]("F received: ") ~> F
D ~> logElement[FlightDelayRecord]("Z received: ") ~> Z

This way you can check areas of your graph for strange behavior that you may or may not be expecting.

like image 177
Brian Pendleton Avatar answered Apr 08 '23 20:04

Brian Pendleton