I am using Kafka streams 2.2.1.
I am using suppress to hold back events until a window closes. I am using event time semantics. However, the triggered messages are only triggered once a new message is available on the stream.
The following code is extracted to sample the problem:
        KStream<UUID, String>[] branches = is
            .branch((key, msg) -> "a".equalsIgnoreCase(msg.split(",")[1]),
                    (key, msg) -> "b".equalsIgnoreCase(msg.split(",")[1]),
                    (key, value) -> true);
    KStream<UUID, String> sideA = branches[0];
    KStream<UUID, String> sideB = branches[1];
    KStream<Windowed<UUID>, String> sideASuppressed =
            sideA.groupByKey(
                    Grouped.with(new MyUUIDSerde(),
                    Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(31)).grace(Duration.ofMinutes(32)))
            .reduce((v1, v2) -> {
                return v1;
            })
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
            .toStream();
Messages are only streamed from 'sideASuppressed' when a new message gets to 'sideA' stream (messages arriving to 'sideB' will not cause the suppress to emit any messages out even if the window closure time has passed a long time ago). Although, in production the problem is likely not to occur much due to high volume, there are enough cases when it is essential not to wait for a new message that gets into 'sideA' stream.
Thanks in advance.
According to Kafka streams documentation:
Stream-time is only advanced if all input partitions over all input topics have new data (with newer timestamps) available. If at least one partition does not have any new data available, stream-time will not be advanced and thus punctuate() will not be triggered if PunctuationType.STREAM_TIME was specified. This behavior is independent of the configured timestamp extractor, i.e., using WallclockTimestampExtractor does not enable wall-clock triggering of punctuate().
I am not sure why this is the case, but, it explains why suppressed messages are only being emitted when messages are available in the queue it uses.
If anyone has an answer regarding why the implementation is such, I will be happy to learn. This behavior causes my implementation to emit messages just to get my the suppressed message to emit in time and causes the code to be much less readable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With