Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consumer.poll() returns new records even without committing offsets?

If I have a enable.auto.commit=false and I call consumer.poll() without calling consumer.commitAsync() after, why does consumer.poll() return new records the next time it's called?

Since I did not commit my offset, I would expect poll() would return the latest offset which should be the same records again.

I'm asking because I'm trying to handle failure scenarios during my processing. I was hoping without committing the offset, the poll() would return the same records again so I can re-process those failed records again.

public class MyConsumer implements Runnable {
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord record : records) {
                try {
                   //process record
                   consumer.commitAsync();
                } catch (Exception e) {
                }
                /**
                If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception. 
                **/
            }

        }
    }
}
like image 978
Glide Avatar asked Apr 19 '17 17:04

Glide


People also ask

How does Kafka consumer poll work?

The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.

What happens when you call poll on Kafka consumer?

It creates any threads necessary, connects to servers, joins the group, etc. Consumer is not thread safe - you can't call its methods from different threads at the same time or else you'll get an exception.

How does a consumer commit offsets in Kafka?

The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.

Does it make sense to poll records if there are no topics?

It does not make sense to poll records if you don’t specify any topics, does it? Line 8 - Start a record-fetching loop until poll timeout doesn’t expire or consumer receives some records. Line 9 - You can interrupt consumer in the middle of polling if you want to shut it down.

Why does consumer leave a group when I call a poll?

As a precaution, Consumer tracks how often you call poll and if you exceed some specified time ( max.poll.interval.ms ), then it leaves the group, so other consumers can move processing further. You’re still asking why?

How to use Kafka consumer to poll messages?

Using Kafka consumer usually follows few simple steps. Poll messages in some kind of loop. If you head over to Consumer class in the sample repository, you’ll find that the run method does exactly that: Let’s break down every step and see what is done underneath.

How long does the poll wait for synchronization with Kafka cluster?

Depending, which poll you call - the one taking long or Duration as parameter it will wait for synchronization with Kafka Cluster indefinitely or for a limited amount of time.


2 Answers

The starting offset of a poll is not decided by the broker but by the consumer. The consumer tracks the last received offset and asks for the following bunch of messages during the next poll.

Offset commits come into play when a consumer stops or fails and another instance that is not aware of the last consumed offset picks up consumption of a partition.

KafkaConsumer has pretty extensive Javadoc that is well worth a read.

like image 137
ftr Avatar answered Sep 30 '22 18:09

ftr


Consumer will read from last commit offset if it get re balanced (means if any consumer leave the group or new consumer added) so handling de-duplication does not come straight forward in kafka so you have to store the last process offset in external store and when rebalance happens or app restart you should seek to that offset and start processing or you should check against some unique key in message against DB to find is dublicate

like image 22
kapil07 Avatar answered Sep 30 '22 18:09

kapil07