If I have a enable.auto.commit=false
and I call consumer.poll()
without calling consumer.commitAsync()
after, why does consumer.poll()
return
new records the next time it's called?
Since I did not commit my offset, I would expect poll()
would return the latest offset which should be the same records again.
I'm asking because I'm trying to handle failure scenarios during my processing. I was hoping without committing the offset, the poll()
would return the same records again so I can re-process those failed records again.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
The Kafka consumer poll() method fetches records in sequential order from a specified topic/partitions. This poll() method is how Kafka clients read data from Kafka. When the poll() method is called, the consumer will fetch records from the last consumed offset.
It creates any threads necessary, connects to servers, joins the group, etc. Consumer is not thread safe - you can't call its methods from different threads at the same time or else you'll get an exception.
The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.
By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.
It does not make sense to poll records if you don’t specify any topics, does it? Line 8 - Start a record-fetching loop until poll timeout doesn’t expire or consumer receives some records. Line 9 - You can interrupt consumer in the middle of polling if you want to shut it down.
As a precaution, Consumer tracks how often you call poll and if you exceed some specified time ( max.poll.interval.ms ), then it leaves the group, so other consumers can move processing further. You’re still asking why?
Using Kafka consumer usually follows few simple steps. Poll messages in some kind of loop. If you head over to Consumer class in the sample repository, you’ll find that the run method does exactly that: Let’s break down every step and see what is done underneath.
Depending, which poll you call - the one taking long or Duration as parameter it will wait for synchronization with Kafka Cluster indefinitely or for a limited amount of time.
The starting offset of a poll is not decided by the broker but by the consumer. The consumer tracks the last received offset and asks for the following bunch of messages during the next poll.
Offset commits come into play when a consumer stops or fails and another instance that is not aware of the last consumed offset picks up consumption of a partition.
KafkaConsumer has pretty extensive Javadoc that is well worth a read.
Consumer will read from last commit offset if it get re balanced (means if any consumer leave the group or new consumer added) so handling de-duplication does not come straight forward in kafka so you have to store the last process offset in external store and when rebalance happens or app restart you should seek to that offset and start processing or you should check against some unique key in message against DB to find is dublicate
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With