Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the correct way to commit after processing each record retrieved from Kafka?

I'm having a bit of trouble understanding how to manually commit properly for each record I consume.

First, let's look at an example from https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

This example commits only after all the records that were received in the poll were processed. I think this isn't a great approach, because if we receive three records, and my service dies while processing the second one, it will end up consuming the first record again, which is incorrect.

So there's a second example that covers committing records on a per-partition basis:

try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

However, I think this suffers from the same problem, it only commits after processing all the records that have come from a particular partition.

The solution I have managed to come up with is this:

        val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
        consumer.subscribe(listOf("some-topic"))

        while (true) {
            val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
            if (!records.isEmpty) {
                mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
                records.forEach {
                    mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
                    processEvent(it.value()) // Complex event processing occurs in this function
                    consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
                }
            }
        }

Now this seems to work while I am testing. So far, during my testing though, there appears to be only one partition being used (I have checked this by logging records.partitions()).

Is this approach going to cause any issues? The Consumer API does not seem to provide a way to commit an offset without specifying a partition, and this seems a bit odd to me. Am I missing something here?

like image 661
Ogre Avatar asked Aug 02 '19 06:08

Ogre


1 Answers

There's no right or wrong way to commit. It really depends on your use case and application.

Committing every offset gives more granular control but it has an implication in terms of performance. On the other side of the spectrum, you could commit asynchronously every X seconds (like auto commit does) and have very little overhead but a lot less control.


In the first example, events are processed and committed in batch. It's interesting in terms of performance, but in case of error, the full batch could be reprocessed.

In the second example, it's also batching but only per partitions. This should lead to smaller batches so less performance but less reprocessing in case things to wrong.

In your last example, you choose the commit every single message. While this gives the most control, it significantly affects performance. In addition, like the other cases, it's not fully error proof.

If the application crashes after the event is processed but before it's committed, upon restarting the last event is likely to be reprocessed (ie at least once semantics). But at least, only one event should be affected.

If you want exactly once semantics, you need to use the Transactional Producer.

like image 85
Mickael Maison Avatar answered Sep 30 '22 01:09

Mickael Maison