I stumbled on the Histogram class in the Flink hierarchy, but there's no "here's how you can use this" kind of documentation around it. I wanted to do something like:
dataStream
.countWindowAll(100)
.fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
.flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
.print()
but sadly the Histogram
isn't serializable through Flink. Maybe there's a "here's how you can use this" or there's another way to histogram via flink.
I'm clearly doing something wrong.
Flink's accumulators are not meant to be used as data types for DataStream
or DataSet
.
Instead, they are registered via the RuntimeContext
, which is available from RichFunction.getRuntimeContext(). This is usually done in the
open()method of a
RichFunction`:
class MyFunc extends RichFlatMapFunction[Int, Int] {
val hist: Histogram = new Histogram()
override def open(conf: Configuration): Unit = {
getRuntimeContext.addAccumulator("myHist", hist)
}
override def flatMap(value: Int, out: Collector[Int]): Unit = {
hist.add(value)
}
}
All parallel instances of an accumulators are periodically shipped to the JobManager (the master process) and merged. Their values can be accessed from the JobExecutionResult
returned from StreamExecutionEnvironment.execute()
.
I think your use case cannot be addressed by Flink's accumulators. You should create a custom histogram data type.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With