Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How I receive the last windowed Kafka message in a windowedBy+aggregate when producer stops to send messages in Java/Spring?

Like I say in the title I want to receive the last windowedBy messages when the producer stops to send menssages. At the moment I am doing it manually, but first of all, a little description.

I have a Kafka producer that is reading lines from a file (every line is a different jSon) every read line is send to Kafka with a difference of 500 ms time period. I have only 120 lines (or jSons).

I have a consumer that consumes all the jSons sent by the producer. The code:

  final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));

  // Topology
  transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( 10 ))
                .grace( Duration.ofMillis( 0 )))
        .aggregate(
                tool::emptyAggregate,
                this::processNewRecord, //new TransactionAggregator(),
                Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

I have the expected functionality, I mean, it receives all the records but to receive the last windowed messages I must to send manually records.

Two questions about this:

  1. Is there any way to auto process the last window? When the producer sends the last record (the 120th jSon) the producer won't send more records anymore. It doesn't matter if I should wait time or whatever.
  2. I have saw that I must send 3 records to process the last window. It isn't clear for me why I must send 3 records (if I send < 3 records the last window isn't consumed completely). Is there any way to only send one record? Change the buffer? Change some property?

I am using Kafka Streams (with spring) in JDK 11 and I am working with dockerized Kafka:

  • confluentinc/cp-kafka:5.5.1
  • zookeeper:3.4.14
  • Kafka:
            <version.kafka>2.5.0</version.kafka>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>${version.kafka}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${version.kafka}</version>
            </dependency>

The properties used in the Kafka consumed are:

  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

And in the producer side:

  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "all");

Please, could you help me?

like image 706
David Avatar asked Dec 19 '25 10:12

David


1 Answers

As you are using suppress() (with untilWindowCloses config) the operator will only emit a final result if "stream-time" advances. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress() would never emit anything. Thus, sending more record is the only way how "stream-time" can be advance.

Note: for a streaming use case, the assumption is that data never stops and thus it's not a issue for an actual deployment -- reading from a file as you do, is not a real stream processing use case: I assume you read from a file for a test, and for this case, your input file should contain a few more record to advance stream-time accordingly.

For more details, check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/

I also did a Kafka Summit talk about this topic: https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

like image 52
Matthias J. Sax Avatar answered Dec 20 '25 23:12

Matthias J. Sax



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!