Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer error: Marking coordinator dead

Tags:

apache-kafka

I have a topic with 10 partitions in Kafka 0.10.0.1 cluster. I have an application that spawns multiple consumer threads. For this topic I am spawning 5 threads. Many times in my application logs I am seeing this entry

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092
(id:2147483646 rack: null) dead for group notifications-consumer

Then there are several entries saying (Re-)joining group notifications-consumer. Afterwards I also see one warning saying

Auto commit failed for group notifications-consumer: Commit cannot be completed since
the group has already rebalanced and assigned the partitions to another member. This means
that the time between subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is spending too much time 
message processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned by poll() with max.poll.records.

Now I have already adjusted my consumer config like so

props.put("max.poll.records", 200);
props.put("heartbeat.interval.ms", 20000);
props.put("session.timeout.ms", 60000);

So, even after properly adjusting the config I am still getting this error. During the rebalance our app is completely unresponsive. Please help.

like image 513
Shades88 Avatar asked May 15 '18 13:05

Shades88


People also ask

What happens if Kafka consumer dies?

When one consumer dies Kafka needs to reassign orphaned partitions to the rest of the consumers. Similarly, when a new consumer joins the group Kafka needs to free up some partitions and assign them to the new consumers (if it can).

What happens if Kafka consumer fails?

If the consumer fails after writing the data to the database but before saving the offsets back to Kafka, it will reprocess the same records next time it runs and save them to the database once more.

What is consumer coordinator in Kafka?

The consumer group coordinator is one of the brokers while the group leader is one of the consumer in a consumer group. The group coordinator is nothing but one of the brokers which receives heartbeats (or polling for messages) from all consumers of a consumer group. Every consumer group has a group coordinator.

How do I know if Kafka consumer is running?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.


1 Answers

With session.timeout.ms you only control the timeouts due to heartbeats, this means that has passed session.timeout.ms milliseconds since the last heartbeat and the cluster declares you as a dead node and triggers a rebalance.

Before KIP-62 the heartbeat was sent within the poll but now is moved to a specific background thread to avoid being evicted from the cluster if you were taking more time than session.timeout.ms to call another poll(). Separating the heartbeat to a specific thread decouples the processing from telling the cluster that you are up and running, but this introduced the risk of "livelock" situations in which the process is alive, but is not making progress, so besides making the heartbeat independent of the poll a new timeout was introduced to ensure that the consumer was alive and making progress. The documentation says these about the implementation pre KIP-62:

As long as the consumer is sending heartbeats, it basically holds a lock on the partitions it was assigned. If the process becomes defunct in such a way that it cannot make progress but is nevertheless continuing to send heartbeats, then no other member in the group will be able to take over the partitions, which causes increasing lag. The fact that heartbeating and processing is all done in the same thread, however, guarantees that consumers must make progress to keep their assignment. Any stall which affects processing also affects heartbeats.

The changes introduced by the KIP-62 includes:

Decoupling the processing timeout: We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. We call this new timeout as the "process timeout" and expose it in the consumer's configuration as max.poll.interval.ms. This config sets the maximum delay between client calls to poll()

From the logs you posted I think you may be in this situation, your app is taking more time than max.poll.interval.ms (5 min by default) to process the 200 polled records. If you are in this scenario you could only reduce even more the max.poll.records or increase the max.poll.interval.ms.

PD:

The max.poll.interval.ms configuration that appears on your log is from (at least) kafka 0.10.1.0 so I assume you make a little mistake there.

Update

Correct me if I understood you wrong but in your last comment you were saying that you are creating 25 consumers (e.g. 25 org.apache.kafka.clients.consumer.KafkaConsumer if you were using java) and suscribing them to N different topics but using the same group.id. If this is correct, you will see rebalacing each time a KafkaConsumer is started or stopped because it will send a JoinGroup or LeaveGroup message (see the corresponding kafka protocol) that contains group.id and member.id (the member.id is not the host so two consumers created in the same process will still have different ids). Note that these message doesn't contain topic subscription information (although that information should be in the brokers but kafka doesn't use it for rebalancing). So each time the cluster receives a JoinGroup or a LeaveGroup for group.id X, it will trigger a rebalance for all consumers with the same group.id X.

If you start 25 consumers with the same group.id you will see rebalancing until the last consumer is created and the corresponding rebalancing ends (if you continue seeing this you may be stopping consumers).

I had this issue a couple months ago.

If we have two KafkaConsumer using the same group.id (running in the same process or in two different processes) and one of them is closed, it triggers a rebalance in the other KafkaConsumer even if they were subscribed to different topics. I suppose that brokers must be taking into account only the group.id for a rebalance and not the subscribed topics corresponding to the pair (group_id,member_id) of the LeaveGroupRequest but I'm wondering if this is the expected behavior or it's something that should be improved? I guess that is probably the first option to avoid a more complex rebalance in the broker and considering that the solution is very simple, i.e. just use different group ids for different KafkaConsumer that subscribe to different topics even if they are running in the same process.


When rebalance occurs we see duplicate messages coming

This is the expeceted behaviour, one consumer consumes the message but before commiting the offset a rebalance was triggered and the commit fails. When the rebalance finished the process that will have that topic assignment will consume the message again (until commit success).

I segregated into two groups, now suddenly problem has disappeared since past 2 hours.

You hit the nail on the head here, but if you don't want to see any (avoidable) rebalancing you should use a different group.id for each topic.

Here is a great talk about different rebalancing scenarios.

like image 64
gabrielgiussi Avatar answered Nov 07 '22 09:11

gabrielgiussi