Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

like image 533
Simon Su Avatar asked Aug 08 '17 05:08

Simon Su


People also ask

What was longer than the configured Max Poll interval MS?

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.

What if Kafka commit fails?

You are getting this exception because your consumer has been kicked out of the consumer group. If you were using commitAsync to commit offsets then retries are not automatic and you may receive a RetriableCommitFailedException to indicate a potentially transient error for which you can manually retry the commit again.

What is Max Poll interval MS?

max.poll.interval.ms : By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from poll(long) . The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll.

What is Autocommit in Kafka?

Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.


2 Answers

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

This resolved the issue for me.

Credit to github link Commit Failures

like image 173
Rahul Teke Avatar answered Oct 13 '22 01:10

Rahul Teke


Hi For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing

Like :

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method
    }
}

and Use this syntax for subscribing the topic :

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

The advantage of doing this :

  1. Messages will not repeat when the rebalancing is taking place.

  2. No commit fail exception

like image 13
Abhimanyu Avatar answered Oct 13 '22 00:10

Abhimanyu