I've been trying to set-up some instrumentation for Akka streams. Got it working, but, even though I named all my Flows that are part of the streams, I still get this sort of names in the metrics: flow-0-0-unknown-operation
A simple example of what I'm trying to do:
val myflow = Flow[String].named("myflow").map(println)
Source.via(myflow).to(Sink.ignore).run()
I basically want to see the metrics for the Actor that gets created for "myflow", with a proper name.
Is this even possible? Am I missing something?
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.
Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.
Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.
I was having this challenge in my project and I solved by using Kamon + Prometheus. However I had to create an Akka Stream Flow
which I can set its name metricName
and export the metric values from it using val kamonThroughputGauge: Metric.Gauge
.
class MonitorProcessingTimerFlow[T](interval: FiniteDuration)(metricName: String = "monitorFlow") extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("MonitorProcessingTimerFlow.in")
val out = Outlet[T]("MonitorProcessingTimerFlow.out")
Kamon.init()
val kamonThroughputGauge: Metric.Gauge = Kamon.gauge("akka-stream-throughput-monitor")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
// mutable state
var open = false
var count = 0
var start = System.nanoTime
setHandler(in, new InHandler {
override def onPush(): Unit = {
try {
push(out, grab(in))
count += 1
if (!open) {
open = true
scheduleOnce(None, interval)
}
} catch {
case e: Throwable => failStage(e)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
override protected def onTimer(timerKey: Any): Unit = {
open = false
val duration = (System.nanoTime - start) / 1e9d
val throughput = count / duration
kamonThroughputGauge.withTag("name", metricName).update(throughput)
count = 0
start = System.nanoTime
}
}
override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
}
Then I created a simple stream that uses the MonitorProcessingTimerFlow
to export the metrics:
implicit val system = ActorSystem("FirstStreamMonitoring")
val source = Source(Stream.from(1)).throttle(1, 1 second)
/** Simulating workload fluctuation: A Flow that expand the event to a random number of multiple events */
val flow = Flow[Int].extrapolate { element =>
Stream.continually(Random.nextInt(100)).take(Random.nextInt(100)).iterator
}
val monitorFlow = Flow.fromGraph(new MonitorProcessingTimerFlow[Int](5 seconds)("monitorFlow"))
val sink = Sink.foreach[Int](println)
val graph = source
.via(flow)
.via(monitorFlow)
.to(sink)
graph.run()
with a proper configuration at application.conf
:
kamon.instrumentation.akka.filters {
actors.track {
includes = [ "FirstStreamMonitoring/user/*" ]
}
}
I can see the throughput metrics on prometheus console with the name name="monitorFlow"
:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With