Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka : commitSync after pause

In our code, we plan to manually commit the offset. Our processing of data is long run and hence we follow the pattern suggested before

  1. Read the records
  2. Process the records in its own thread
  3. pause the consumer
  4. continue polling paused consumer so that it is alive
  5. When the records are processed, commit the offsets
  6. When commit done, then resume the consumer

The code somewhat looks like this:

   while (true) { 
          ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
                if (!records.isEmpty()) {
                    task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
                }
                if (shouldPause(task)) {
                    consumer.pause(listener.getPartitions());
                }
                if (isDoneProcessing(task)) {
                   consumer.commitSync();
                   consumer.resume(listener.getPartitions());
                }
    }

If you notice, we commit using commitSync() (without any parameters). Since the consumer is paused, in the next iteration we would get no records. But commitSync() would happen later. In that case which offset's would it try to commit? I have read the definitive guide and googled but cannot find any information about it.

I think we should explicitly save the offsets. But I am not sure if the current code would be an issue.

Any information would be helpful.

Thanks, Prateek

like image 629
Prateek Negi Avatar asked May 31 '26 10:05

Prateek Negi


1 Answers

If you call consumer.commitSync() with no parameters it should commit the latest offset that your consumer has received. Since you can receive many messages in a single poll() you might want to have finer control over the commit and explicitly commit a specific offset such as the latest message that your consumer has successfully processed. This can be done by calling commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)

You can see the syntax for the two ways to call commitSync here in the Consumer Javadoc http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()

like image 101
Hans Jespersen Avatar answered Jun 04 '26 13:06

Hans Jespersen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!