I have an Akka Stream application with a single flow/graph. I want to measure the flow rate at the source and log it every 5 seconds, like 'received 3 messages in the last 5 seconds'. I tried with,
someOtherFlow
.groupedWithin(Integer.MAX_VALUE, 5 seconds)
.runForeach(seq =>
log.debug(s"received ${seq.length} messages in the last 5 seconds")
)
but it only outputs when there are messages, no empty list when there are 0 messages. I want the 0's as well. Is this possible?
You could try something like
src
.conflateWithSeed(_ ⇒ 1){ case (acc, _) ⇒ acc + 1 }
.zip(Source.tick(5.seconds, 5.seconds, NotUsed))
.map(_._1)
which should batch your elements until the tick releases them. This is inspired from an example in the docs.
On a different note, if you need this for monitoring purposes, you could leverage a 3rd party tool for this purpose - e.g. Kamon.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With