I was using kafka 0.10.2 and now faced a CommitFailedException. like:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?
Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code
public static void main(String[] args) throws InterruptedException {
Logger logger = Logger.getLogger(KafkaConsumer10.class);
logger.info("XX");
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-broker:9098");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.interval.ms", "300000");
props.put("session.timeout.ms", "10000");
props.put("max.poll.records", "2");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("t1"));
while (true) {
Thread.sleep(11000);
ConsumerRecords<String, String> records = consumer.poll(100);
//Thread.sleep(11000);
Thread.sleep(11000);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You are getting this exception because your consumer has been kicked out of the consumer group. If you were using commitAsync to commit offsets then retries are not automatic and you may receive a RetriableCommitFailedException to indicate a potentially transient error for which you can manually retry the commit again.
max.poll.interval.ms : By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from poll(long) . The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll.
Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.
session.timeout.ms
set on the consumer should be less than the group.max.session.timeout.ms
set on Kafka broker.
This resolved the issue for me.
Credit to github link Commit Failures
Hi For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing
Like :
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Implement what you want to do once rebalancing is done.
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// commit current method
}
}
and Use this syntax for subscribing the topic :
kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
The advantage of doing this :
Messages will not repeat when the rebalancing is taking place.
No commit fail exception
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With