Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka keeps rebalancing consumers

We have 10 consumers in a group listening for a topic. What is happening very often is to see the consumers being rebalanced very often (which completely stops the consumer process for some time).

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
ParserBodyToParse 0          99              99              0               consumer-1-f29b7eb7-b871-477c-af52-446fbf4b0496  /10.12.18.58    consumer-1
ParserBodyToParse 1          97              97              0               consumer-10-6639ee02-8e68-40e6-aca1-eabd89bf828e /10.12.18.58    consumer-10
ParserBodyToParse 2          97              97              0               consumer-11-c712db8b-0396-4388-9e3a-e8e342355547 /10.12.18.58    consumer-11
ParserBodyToParse 3          97              98              1               consumer-12-0cc6fe12-d640-4344-91c0-f15e63c20cca /10.12.18.58    consumer-12
ParserBodyToParse 4          97              98              1               consumer-13-b904a958-141d-412e-83ea-950cd51e25e0 /10.12.18.58    consumer-13
ParserBodyToParse 5          97              98              1               consumer-14-7c70ba88-8b8c-4fad-b15b-cf7692a4b9ce /10.12.18.58    consumer-14
ParserBodyToParse 6          98              98              0               consumer-15-f0983c3d-8704-4127-808d-ec8b6b847008 /10.12.18.58    consumer-15
ParserBodyToParse 7          97              97              0               consumer-18-de5d20dd-217c-4db2-9b39-e2fdbca386e9 /10.12.18.58    consumer-18
ParserBodyToParse 8          98              98              0               consumer-5-bdeaf30a-d2bf-4aec-86ea-9c35a7acfe21  /10.12.18.58    consumer-5
ParserBodyToParse 9          98              98              0               consumer-9-4de1bf17-9474-4bd4-ae61-4ab254f52863  /10.12.18.58    consumer-9

# ./kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092  --describe  --group ParserKafkaPipeline | grep -e ParserBody | sort
Warning: Consumer group 'ParserKafkaPipeline' is rebalancing.
ParserBodyToParse 0          99              99              0               -               -               -
ParserBodyToParse 1          99              99              0               -               -               -
ParserBodyToParse 2          99              99              0               -               -               -
ParserBodyToParse 3          99              100             1               -               -               -
ParserBodyToParse 4          99              100             1               -               -               -
ParserBodyToParse 5          99              100             1               -               -               -
ParserBodyToParse 6          100             100             0               -               -               -
ParserBodyToParse 7          99              99              0               -               -               -
ParserBodyToParse 8          100             100             0               -               -               -
ParserBodyToParse 9          100             100             0               -               -               -

Notice the warning in the second call above.

Consuming these messages might take a long time, but it shouldn't take more than two minutes. I checked that the limit on consumer.poll is 5 minutes, which shouldn't be an issue. Are there some logs to check what exactly is happening?

UPDATE:

We use Kafka 2.2.1 and Java consumer. We didn't change the default value of max.session and max.heartbeat. The consumer is basically waiting for IO from other service, so it is not using any CPU – that is why I expect the heartbeat should be working correctly.

Our consumer code is following:

    inline fun <reified T : Any> consume(
            topic: KafkaTopic,
            groupId: String,
            batchSize: Int = 50,
            crossinline consume: (key: String?, value: T) -> (Unit)
    ) = thread {
        val consumerProperties = Properties()
        consumerProperties.putAll(properties)
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize)

        val consumer = KafkaConsumer<String?, ByteArray>(consumerProperties)

        consumer.subscribe(listOf(topic.toString()))

        while (true) try {
            val records = consumer.poll(Duration.ofMinutes(pollDurationMinutes))
            log.debug("Topic $topic consumed by group $groupId: ${records.count()} records.")
            records.forEach { record -> consumeRecord(record, topic, consume) }
        } catch (e: Exception) {
            log.fatal("Couldn't consume records: ${e.message}.", e)
            // sleep to prevent logging hell when connection failure
            Thread.sleep(1000)
        }
    }
like image 868
Vojtěch Avatar asked Jun 18 '19 10:06

Vojtěch


People also ask

How do you prevent rebalancing in Kafka consumer?

This is possible via the consumer configuration group.instance.id . If a consumer restarts for any reason, the group coordinator can assign the same partitions to the consumer without rebalancing everything.

Why is Kafka consumer rebalancing?

A consumer is properly unsubscribed from the topic. A consumer hasn't polled the topic for a while. An error within the consumer's application that removes it from service. Some latency between the consumer and cluster that extends beyond the configured consumer session timeout.

How often does Kafka rebalance?

During the entire rebalancing process, i.e. as long as the partitions are not reassigned, consumers no longer process any data. By default, the rebalance timeout is fixed to 5 minutes which can be a very long period during which the increasing consumer-lag can become an issue.


2 Answers

Frequent rebalances are usually caused because it is taking too long for the consumer to process batches. This happens because the consumer is processing the batch for a long time (and heartbeats are not being sent) and therefore the brokers think that consumer was lost and they start re-balancing.

I would either suggest to create smaller batches by reducing the value of max.partition.fetch.bytes or extend/increase heartbeat intervals by increasing the value of heartbeat.interval.ms.

like image 184
Giorgos Myrianthous Avatar answered Oct 11 '22 00:10

Giorgos Myrianthous


I think that the first part of Giorgos answer is correct, up to ".....processing the batch for a long time" but the configuration advice is for a different problem.

There are two causes of a rebalance, too long between polls or too long between heartbeats. The logs should tell you which has caused rebalance, but it is usually the former.

If the problem is heartbeat then the advised configuration changes may help, and/or session.timeout.ms. The heartbeat runs in a separate thread and allows the group to quickly determine if a consumer application has died.

If the problem is too long between polls and you can't speed up your processing then you need to increase the allowed gap between calling poll, or reduce the number of records you handle on each poll. The relevant properties are max.poll.interval (default 5 minutes) or max.poll.records (default 500)

like image 4
Chris Avatar answered Oct 11 '22 00:10

Chris