Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Are there examples of using the Histogram accumulator in Flink

Tags:

apache-flink

I stumbled on the Histogram class in the Flink hierarchy, but there's no "here's how you can use this" kind of documentation around it. I wanted to do something like:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()

but sadly the Histogram isn't serializable through Flink. Maybe there's a "here's how you can use this" or there's another way to histogram via flink.

I'm clearly doing something wrong.

like image 294
Cubs Fan Ron Avatar asked Mar 10 '23 07:03

Cubs Fan Ron


1 Answers

Flink's accumulators are not meant to be used as data types for DataStream or DataSet.

Instead, they are registered via the RuntimeContext, which is available from RichFunction.getRuntimeContext(). This is usually done in theopen()method of aRichFunction`:

class MyFunc extends RichFlatMapFunction[Int, Int] {

  val hist: Histogram = new Histogram()

  override def open(conf: Configuration): Unit = {
    getRuntimeContext.addAccumulator("myHist", hist)
  }

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    hist.add(value)
  }
}

All parallel instances of an accumulators are periodically shipped to the JobManager (the master process) and merged. Their values can be accessed from the JobExecutionResult returned from StreamExecutionEnvironment.execute().

I think your use case cannot be addressed by Flink's accumulators. You should create a custom histogram data type.

like image 127
Fabian Hueske Avatar answered May 01 '23 11:05

Fabian Hueske