Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink: How to convert the deprecated fold to aggregrate?

I am following the quick start example of Flink: Monitoring the Wikipedia Edit Stream.

The example is in Java, and I am implementing it in Scala, as following:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("", 0L)) {
        (acc: (String, Long), event: WikipediaEditEvent) => {
          (event.getUser, acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

However, the fold function in Flink is already deprecated, and the aggregate function is recommended.

enter image description here

But I did not find the example or tutorial about how to convert the deprecated fold to aggregrate.

Any idea how to do this? Probably not only by applying aggregrate.

UPDATE

I have another implementation as following:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser, e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String, edits: Long)
}

I also would like to know how to have the implementation using self-defined AggregateFunction.

UPDATE

I followed this documentation: AggregateFunction, but have the following question:

In the source code of Interface AggregateFunction for release 1.3, you will see add indeed returns void:

void add(IN value, ACC accumulator);

But for version 1.4 AggregateFunction, is is returning:

ACC add(IN value, ACC accumulator);

How should I handle this?

The Flink version I am using is 1.3.2 and the documentation for this version is not having AggregateFunction, but there is no release 1.4 in artifactory yet.

enter image description here

like image 420
fluency03 Avatar asked Nov 05 '17 16:11

fluency03


2 Answers

You will find some documentation for AggregateFunction in the Flink 1.4 docs, including an example.

The version included in 1.3.2 is limited to being used with mutable accumulator types, where the add operation modifies the accumulator. This has been fixed for Flink 1.4, but hasn't been released.

like image 63
David Anderson Avatar answered Nov 10 '22 19:11

David Anderson


import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}

class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
  override def createAccumulator() = ("", 0)

  override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)

  override def getResult(accumulator: (String, Int)) = accumulator

  override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}

object WikipediaAnalysis extends App {
  val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())

  val result: DataStream[(String, Int)] = edits
    .keyBy(_.getUser)
    .timeWindow(Time.seconds(5))
    .aggregate(new SumAggregate)
//    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
  result.print()

  result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
  see.execute("Wikipedia User Edit Volume")
}
like image 3
Casel Chen Avatar answered Nov 10 '22 20:11

Casel Chen