I have a KStream
in which I want to count some dimension of the events. I do it as follows:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));
I want to have a new KStream
with those aggregations as events. I can do it easily like this:
ret.toStream().to("output");
The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.
I think I can do it like this:
ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));
But I wonder if there's a better / more elegant way of doing this?
You can use new feature of KTable KTable.suppress in version 2.1
This method allows you get exactly one final result per window/key for windowed computations.
More about suppres
in KIP-328
You can update your implementation with suppress
like this:
KTable<Windowed<Long>, Counter> ret = input.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
.aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
.suppress(untilWindowCloses(BufferConfig.unbounded()));
ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With