I'm doing a hopping window aggregation on a 4 hr window advancing every 5 mins. As the hopping windows are overlapping, I'm getting duplicate keys with different aggregated value.
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
How do I eliminate duplicate keys with repeating data or pick only the keys that holds the latest value.
Update May 2021: The Kafka Streams API supports "final" window results nowadays, via a suppress()
operator. See the previous docs link as well as the blog Kafka Streams’ Take on Watermarks and Triggers from March 2019 for details.
After defining your windowed computation, you can suppress the intermediate results, emitting the final count for each user when the window is closed.
KGroupedStream<UserId, Event> grouped = ...;
grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
Original answer (still applies when NOT using the suppress()
operator above):
If I understand you correctly, then this is expected behavior. You are not seeing "duplicate" keys, but you see continuous updates for the same key.
Think:
# Extreme case: record caches disabled (size set to 0)
alice->1, alice->2, alice->3, alice->4, ..., alice->100, ...
# With record cache enabled, you would see sth like this.
alice->23, alice->59, alice->100, ...
Take a look at the explanation at http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management, which describes this in more detail. If you want to see less "duplicates" per record key, you can increase the size of record caches (when using the DSL) via cache.max.bytes.buffering
aka StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
in your application's configuration. There's also an interplay with commit.interval.ms
.
If you are wondering "why does the Kafka Streams API behave in this way in the first place", I'd recommend the blog post https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ that was published earlier this week.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With