Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to output result of windowed aggregation only when window is finished? [duplicate]

I have a KStream in which I want to count some dimension of the events. I do it as follows:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
  .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
  .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()));

I want to have a new KStream with those aggregations as events. I can do it easily like this:

ret.toStream().to("output");

The problem is that every event in "input" topic will produce an event to "output" topic. I would like to publish an event to the output topic only when a window is finished. For example if the window is of one minute, send a single event per key per minute.

I think I can do it like this:

ret.toStream().foreach((k, v) -> sendToKafkaTopic("output"));

But I wonder if there's a better / more elegant way of doing this?

like image 500
user1028741 Avatar asked Dec 27 '18 13:12

user1028741


1 Answers

You can use new feature of KTable KTable.suppress in version 2.1

This method allows you get exactly one final result per window/key for windowed computations.

More about suppres in KIP-328

You can update your implementation with suppress like this:

KTable<Windowed<Long>, Counter> ret = input.groupByKey()
        .windowedBy(TimeWindows.of(Duration.of(10, SECONDS)))
        .aggregate(Counter::new, (k, v, c) -> new Counter(c.count + v.getDimension()))
        .suppress(untilWindowCloses(BufferConfig.unbounded()));

ret.toStream().to("output"); // now stream should flush events to the output topic only when the window closes
like image 159
Stefan Repcek Avatar answered Sep 30 '22 08:09

Stefan Repcek