We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app which spawns many consumer threads consuming from different topics. For every topic we have specified different consumer-group.
A lot of times I see that whenever this application is restarted one or more CGs take more than 5 minutes to receive partition assignment. Till that time consumers for that topic don't consume anything. If I go to Kafka broker and run consumer-groups.sh and describe that particular CG I see that it is rebalancing. In server.log I see such lines
Preparing to stabilize group otp-sms-consumer Stabilized group otp-sms-consumer
And between these two logs there is usually a gap of about 5 minutes or more. On consumer side when I turn trace level logs, there is literally no activity during this pause time. After a couple of minutes a lot of activity starts. There is time critical data stored in that topic like otp-sms and we cannot tolerate such long delays. What can be the reason for such long rebalances.
Here's our consumer config
auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 300000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Please help.
There are several causes for a consumer group rebalance to take place. A new consumer joins a consumer group, an existing consumer leaves a consumer group, or the broker thinks a consumer may have failed. As well as these, any other need for resources to be reassigned will trigger a rebalance.
Kafka Consumers If there are way too many producers writing data to the same topic when there are a limited number of consumers then then the reading processes will always be slow.
Having a higher number of partitions will lead to decreased throughput per single partition. Also, having a higher number of app instances, restart of a single one will lead to smaller Kafka lag during rebalancing.
Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.
As we explained previously, Kafka starts to rebalance as soon as a consumer joins the group. After the rebalancing, the coordinator notices that more consumer joined and starts another rebalancing. This cycle repeats until all consumers joined the group successfully.
A consumer is a process that reads from a kafka topic and process a message. A topic may contain multiple partitions. A partition is owned by a broker (in a clustered environment). A consumer group may contain multiple consumers.
Repeat. A consumer is a process that reads from a kafka topic and process a message. A topic may contain multiple partitions. A partition is owned by a broker (in a clustered environment). A consumer group may contain multiple consumers. The consumers in a group cannot consume the same message.
It could be that the crash message was sent to the blocked consumer. Spam some random messages to the kafka-console-producer. Since the consumer group is not rebalancing, the crashing consumer reads the crash message repeatedly and restarts multiple times.
Rebalance timeout is equal to max.poll.interval.ms
(5 minutes in your case) When rebalance starts in a group, Kafka revokes all the consumers in that group. Then waits for all alive consumers (consumers which send heartbeat) to poll() and send JoinGroupRequest.
This waiting process will end up with rebalance timeout or all the alive consumers poll() and Kafka assign partitions to these consumers.
So in your case you probably have a long running process in one of your consumers, and Kafka waits this process to complete to assign partitions.
For more information you can check these:
Consumer groups are an essential mechanism of Kafka. They allow consumers to share load and elastically scale by dynamically assigning the partitions of topics to consumers. In our current model of consumer groups, whenever a rebalance happens every consumer from that group experiences downtime - their poll() calls block until every other consumer in the group calls poll(). That is due to the fact that every consumer needs to call JoinGroup in a rebalance scenario in order to confirm it is still in the group.
Today, if the client has configured
max.poll.interval.ms
to a large value, the group coordinator broker will take in an unlimited number of join group requests and the rebalance could therefore continue for an unbounded amount of time. (https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit)
-
Since we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with max.poll.interval.ms. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll(). From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread)
I suspect your cluster version is at least 0.10.1.0 as I see max.poll.interval.ms
in your consumer configuration which was introduced in this version.
Kafka 0.10.1.0 integrates KIP-62 which introduces a rebalance timeout set to max.poll.interval.ms
and its default value is 5 minutes.
I guess if you don't want to wait timeout expiration during a rebalance, your consumers need to cleanly leave consumer group by calling close()
method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With