Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka CommitFailedException consumer exception

After create multiple consumers (using Kafka 0.9 java API) and each thread started, I'm getting the following exception

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

and then start consuming message normally, I would like to know what is causing this exception in order to fix it.

like image 423
Hugo Carmona Avatar asked Feb 26 '16 17:02

Hugo Carmona


People also ask

What happens if Kafka consumer does not commit?

However, if the last commit fails before a rebalance occurs or before the consumer is shut down, then offsets will be reset to the last commit and you will likely see duplicates.

Can a Kafka consumer read from multiple partitions?

In Kafka within a consumer group you can have max 1 consumer per partition.

Is group ID mandatory for Kafka consumer?

The consumer group-id is mandatory, it plays a major role when it comes to scalable message consumption. To start a consumer group-id is mandatory.

What is Max Poll records in Kafka?

max. bytes=52428800 max.


1 Answers

Try also to tweak the following parameters:

  • heartbeat.interval.ms - This tells Kafka wait the specified amount of milliseconds before it consider the consumer will be considered "dead"
  • max.partition.fetch.bytes - This will limit the amount of messages (up to) the consumer will receive when polling.

I noticed that the rebalancing occurs if the consumer does not commit to Kafka before the heartbeat times out. If the commit occurs after the messages are processed, the amount of time to process them will determine these parameters. So, decreasing the number of messages and increasing the heartbeat time will help to avoid rebalancing.

Also consider to use more partitions, so there will be more threads processing your data, even with less messages per poll.

I wrote this small application to make tests. Hope it helps.

https://github.com/ajkret/kafka-sample

UPDATE

Kafka 0.10.x now offers a new parameter to control the number of messages received: - max.poll.records - The maximum number of records returned in a single call to poll().

UPDATE

Kafka offers a way to pause the queue. While the queue is paused, you can process the messages in a separated Thread, allowing you to call KafkaConsumer.poll() to send heartbeats. Then call KafkaConsumer.resume() after the processing is done. This way you mitigate the problems of causing rebalances due to not sending heartbeats. Here is an outline of what you can do :

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}
like image 97
ajkret Avatar answered Sep 17 '22 22:09

ajkret