Logo Questions Linux Laravel Mysql Ubuntu Git Menu

kafka stop consuming message from new assigned partitions after rebalancing

I'm quite new to kafka (also to english...),I facing this issue and can not google any solutions.

I use spring-boot,spring-kafka support, I have installed kafka_2.11- on my local machine (with only one broker 0)

s1.then I create topic by

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking

My consumer config: applitions.properties:


s2. Then i startup 3 consumers by change the 'kafka.client.id' and run spring-boot main class . on eclipse console, I can check the partitions assignment:

client-1: partitions assigned:[tracking-4, tracking-3]
client-2: partitions assigned:[tracking-2, tracking-1]
client-3: partitions assigned:[tracking-0]

s3. start the pruducer to send 20 messages to the topic, each start to consume message of specific partition

s4. I close the consume 1, kafka do the rebalancing automatically , new partitions assignment:

client-1: partitions assigned:[]
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0]
client-3: partitions assigned:[tracking-4,tracking-3]

s5. I found message on partition 'tracking-3' is no consumed !!

issue can be reproduced every time , aways some messages in new assigned partitions loss, do u can any suggesstions? Please help me ,thanks

like image 304
rellocs wood Avatar asked Apr 14 '17 04:04

rellocs wood

People also ask

How do I stop consuming messages from Kafka topic?

Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.

What happens when Kafka rebalancing?

Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.

Can a Kafka consumer consume from multiple partitions?

In Kafka within a consumer group you can have max 1 consumer per partition.

How long does Kafka rebalance take?

By default, the rebalance timeout is fixed to 5 minutes which can be a very long period during which the increasing consumer-lag can become an issue.

1 Answers

I reproduced it; it looks like a problem in kafka itself (with auto.comit.enabled=true) on the rebalance, kafka is reporting the "position" of the unread partitions (the offset of the <i>next record</i> that will be fetched (if a record with that offset exists)) as the end of the partition.

In fact, when I use the kafka-consumer-groups tool, the offsets of the unread partitions are already at the "end". When I run it with just one consumer, while it is reading the first partition, I see...

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so43405009

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          37              40              3          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /                      client1
tracking                       1          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /                      client1
tracking                       2          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /                      client1
tracking                       3          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /                      client1
tracking                       4          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /                      client1

Notice the CURRENT_OFFSET column.

On the next run, I ran it twice, once while the first partition is being processed, and again a bit later...

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          41              44              3          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       1          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       2          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       3          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       4          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       1          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       2          41              44              3          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       3          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1
tracking                       4          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /                      client1

See how the current offset of partition 2 went down from 44 to 41.

Disabling auto-commit solved it for me...



TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          52              52              0          client1-59413599-81e8-49dd-bbd7-8a62152f11e5      /                      client1
tracking                       1          49              52              3          client1-59413599-81e8-49dd-bbd7-8a62152f11e5      /                      client1
tracking                       2          49              52              3          client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42      /                      client2
tracking                       3          48              52              4          client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42      /                      client2
tracking                       4          51              52              1          client3-20da8742-af38-403e-b125-5d0c7c771319      /                      client3

Here is my test program:

public class So43405009Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43405009Application.class, args);

    private KafkaTemplate<String, String> template;

    private String clientId;

    public void run(String... args) throws Exception {
        if (this.clientId.endsWith("1")) {
            for (int i = 0; i < 20; i++) {
                this.template.sendDefault("foo" + i);

    public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
        ContainerProperties containerProperties = new ContainerProperties("tracking");
        containerProperties.setMessageListener((MessageListener<?, ?>) d -> {
            try {
            catch (InterruptedException e) {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(cf,
        return container;


with properties


I see the same results with as well.


It turns out to be a spring-kafka bug; it works with auto-commit enabled, but you have to explicitly enable it


Otherwise the container assumes it's false and causes the above strange behavior - looks like the client doesn't like calling the consumer's commit method if auto-commit is enabled. #288.

I would generally recommend setting to false, and choosing one of the container's AckModes instead; e.g. RECORD commits after ever record, BATCH after every batch received by a poll (default).

like image 95
Gary Russell Avatar answered Nov 03 '22 09:11

Gary Russell