Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - Hopping windows - deduplicate keys

I'm doing a hopping window aggregation on a 4 hr window advancing every 5 mins. As the hopping windows are overlapping, I'm getting duplicate keys with different aggregated value.

TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)

How do I eliminate duplicate keys with repeating data or pick only the keys that holds the latest value.

like image 617
Barro Avatar asked Dec 01 '22 11:12

Barro


1 Answers

Update May 2021: The Kafka Streams API supports "final" window results nowadays, via a suppress() operator. See the previous docs link as well as the blog Kafka Streams’ Take on Watermarks and Triggers from March 2019 for details.

After defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.

KGroupedStream<UserId, Event> grouped = ...;

grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
       .count()
       .suppress(Suppressed.untilWindowCloses(unbounded()))
       .filter((windowedUserId, count) -> count < 3)
       .toStream()
       .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

Original answer (still applies when NOT using the suppress() operator above):

If I understand you correctly, then this is expected behavior. You are not seeing "duplicate" keys, but you see continuous updates for the same key.

Think:

# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...

# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...

Take a look at the explanation at http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management, which describes this in more detail. If you want to see less "duplicates" per record key, you can increase the size of record caches (when using the DSL) via cache.max.bytes.buffering aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG in your application's configuration. There's also an interplay with commit.interval.ms.

If you are wondering "why does the Kafka Streams API behave in this way in the first place", I'd recommend the blog post https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ that was published earlier this week.

like image 72
Michael G. Noll Avatar answered Dec 06 '22 02:12

Michael G. Noll