I have written this code in a Kafka stream application:
KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
.aggregate(() -> {...})
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()...
which should (if i understood it correctly) emit records per Key after the window is closed. Somehow the behavior is the following:
The stream doesn't emit the first record and only forward it after the second record even with a different Key and then the second record is emitted only after the 3rd and so forth..
I have tried multiple StreamConfigs with "exactly_once" and with or without Caching also, this behavior persists.
Thanks in advance for your help !
That is expected behavior. Note, that suppress()
is based on event-time. Thus, as long as no new data arrives, time cannot advance and thus evicting the record earlier would be wrong, because there is no guarantee, that the next record might belong to the current window.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With