Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send final kafka-streams aggregation result of a time windowed KTable?

What I'd like to do is this:

  1. Consume records from a numbers topic (Long's)
  2. Aggregate (count) the values for each 5 sec window
  3. Send the FINAL aggregation result to another topic

My code looks like this:

KStream<String, Long> longs = builder.stream(             Serdes.String(), Serdes.Long(), "longs");  // In one ktable, count by key, on a five second tumbling window. KTable<Windowed<String>, Long> longCounts =              longs.countByKey(TimeWindows.of("longCounts", 5000L));  // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key())           .to("long-counts"); 

It looks like everything works as expected, but the aggregations are sent to the destination topic for each incoming record. My question is how can I send only the final aggregation result of each window?

like image 658
odavid Avatar asked Aug 13 '16 18:08

odavid


People also ask

What is the output of KStream KTable join?

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

What is KTable in Kafka streams?

KTable is an abstraction of a changelog stream from a primary-keyed table. Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.

Can Kafka aggregate data?

Kafka Streams natively supports "incremental" aggregation functions, in which the aggregation result is updated based on the values captured by each window. Incremental functions include `count()`, `sum()`, `min()`, and `max()`. An average aggregation cannot be computed incrementally.

How do you aggregate data in a Kafka topic?

Introducing the aggregation in Kafka and explained this in easy way to implement the Aggregation on real time streaming. In order to aggregate the stream we need do two steps operations. Group the stream — groupBy(k,v) (if Key exist in stream) or groupByKey() — Data must partitioned by key.


2 Answers

In Kafka Streams there is no such thing as a "final aggregation". Windows are kept open all the time to handle out-of-order records that arrive after the window end-time passed. However, windows are not kept forever. They get discarded once their retention time expires. There is no special action as to when a window gets discarded.

See Confluent documentation for more details: http://docs.confluent.io/current/streams/

Thus, for each update to an aggregation, a result record is produced (because Kafka Streams also update the aggregation result on out-of-order records). Your "final result" would be the latest result record (before a window gets discarded). Depending on your use case, manual de-duplication would be a way to resolve the issue (using lower lever API, transform() or process())

This blog post might help, too: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Another blog post addressing this issue without using punctuations: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Update

With KIP-328, a KTable#suppress() operator is added, that will allow to suppress consecutive updates in a strict manner and to emit a single result record per window; the tradeoff is an increase latency.

like image 89
Matthias J. Sax Avatar answered Sep 23 '22 10:09

Matthias J. Sax


From Kafka Streams version 2.1, you can achieve this using suppress.

There is an example from the mentioned apache Kafka Streams documentation that sends an alert when a user has less than three events in an hour:

KGroupedStream<UserId, Event> grouped = ...; grouped   .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))   .count()   .suppress(Suppressed.untilWindowCloses(unbounded()))   .filter((windowedUserId, count) -> count < 3)   .toStream()   .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count)); 

As mentioned in the update of this answer, you should be aware of the tradeoff. Moreover, note that suppress() is based on event-time.

like image 31
Amir Masud Zare Bidaki Avatar answered Sep 20 '22 10:09

Amir Masud Zare Bidaki