Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to log flow rate in Akka Stream?

I have an Akka Stream application with a single flow/graph. I want to measure the flow rate at the source and log it every 5 seconds, like 'received 3 messages in the last 5 seconds'. I tried with,

someOtherFlow
  .groupedWithin(Integer.MAX_VALUE, 5 seconds)
  .runForeach(seq => 
    log.debug(s"received ${seq.length} messages in the last 5 seconds")
  )

but it only outputs when there are messages, no empty list when there are 0 messages. I want the 0's as well. Is this possible?

like image 962
Jasper Avatar asked Dec 04 '22 23:12

Jasper


1 Answers

You could try something like

  src
    .conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
    .zip(Source.tick(5.seconds, 5.seconds, NotUsed))
    .map(_._1)

which should batch your elements until the tick releases them. This is inspired from an example in the docs.

On a different note, if you need this for monitoring purposes, you could leverage a 3rd party tool for this purpose - e.g. Kamon.

like image 55
Stefano Bonetti Avatar answered Dec 22 '22 04:12

Stefano Bonetti