Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka stop consuming message from new assigned partitions after rebalancing

I'm quite new to kafka (also to english...),I facing this issue and can not google any solutions.

I use spring-boot,spring-kafka support, I have installed kafka_2.11-0.10.1.1 on my local machine (with only one broker 0)

s1.then I create topic by

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking

My consumer config: applitions.properties:

kafka.servers.bootstrap=localhost:9092 
kafka.topic.tracking=tracking
kafka.group.id=trackingGroup
kafka.client.id=client-1

s2. Then i startup 3 consumers by change the 'kafka.client.id' and run spring-boot main class . on eclipse console, I can check the partitions assignment:

client-1: partitions assigned:[tracking-4, tracking-3]
client-2: partitions assigned:[tracking-2, tracking-1]
client-3: partitions assigned:[tracking-0]

s3. start the pruducer to send 20 messages to the topic, each start to consume message of specific partition

s4. I close the consume 1, kafka do the rebalancing automatically , new partitions assignment:

client-1: partitions assigned:[]
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0]
client-3: partitions assigned:[tracking-4,tracking-3]

s5. I found message on partition 'tracking-3' is no consumed !!

issue can be reproduced every time , aways some messages in new assigned partitions loss, do u can any suggesstions? Please help me ,thanks

like image 304
rellocs wood Avatar asked Apr 14 '17 04:04

rellocs wood


People also ask

How do I stop consuming messages from Kafka topic?

Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.

What happens when Kafka rebalancing?

Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.

Can a Kafka consumer consume from multiple partitions?

In Kafka within a consumer group you can have max 1 consumer per partition.

How long does Kafka rebalance take?

By default, the rebalance timeout is fixed to 5 minutes which can be a very long period during which the increasing consumer-lag can become an issue.


1 Answers

I reproduced it; it looks like a problem in kafka itself (with auto.comit.enabled=true) on the rebalance, kafka is reporting the "position" of the unread partitions (the offset of the <i>next record</i> that will be fetched (if a record with that offset exists)) as the end of the partition.

In fact, when I use the kafka-consumer-groups tool, the offsets of the unread partitions are already at the "end". When I run it with just one consumer, while it is reading the first partition, I see...

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so43405009

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          37              40              3          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /10.0.0.6                      client1
tracking                       1          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /10.0.0.6                      client1
tracking                       2          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /10.0.0.6                      client1
tracking                       3          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /10.0.0.6                      client1
tracking                       4          40              40              0          client1-8129bb3d-3a83-4c83-9128-3a2762ede758      /10.0.0.6                      client1

Notice the CURRENT_OFFSET column.

On the next run, I ran it twice, once while the first partition is being processed, and again a bit later...

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          41              44              3          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       1          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       2          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       3          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       4          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1

and

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       1          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       2          41              44              3          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       3          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1
tracking                       4          44              44              0          client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8      /10.0.0.6                      client1

See how the current offset of partition 2 went down from 44 to 41.

Disabling auto-commit solved it for me...

spring.kafka.consumer.enable-auto-commit=false

...

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
tracking                       0          52              52              0          client1-59413599-81e8-49dd-bbd7-8a62152f11e5      /10.0.0.6                      client1
tracking                       1          49              52              3          client1-59413599-81e8-49dd-bbd7-8a62152f11e5      /10.0.0.6                      client1
tracking                       2          49              52              3          client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42      /10.0.0.6                      client2
tracking                       3          48              52              4          client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42      /10.0.0.6                      client2
tracking                       4          51              52              1          client3-20da8742-af38-403e-b125-5d0c7c771319      /10.0.0.6                      client3

Here is my test program:

@SpringBootApplication
public class So43405009Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43405009Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Value("${spring.kafka.consumer.client-id}")
    private String clientId;

    @Override
    public void run(String... args) throws Exception {
        if (this.clientId.endsWith("1")) {
            for (int i = 0; i < 20; i++) {
                this.template.sendDefault("foo" + i);
            }
        }
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
        ContainerProperties containerProperties = new ContainerProperties("tracking");
        containerProperties.setMessageListener((MessageListener<?, ?>) d -> {
            System.out.println(d);
            try {
                Thread.sleep(5_000);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(cf,
                containerProperties);
        return container;
    }

}

with properties

spring.kafka.listener.ack-mode=record
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so43405009
spring.kafka.consumer.client-id=client1
spring.kafka.template.default-topic=tracking

I see the same results with 0.10.2.0 as well.

EDIT

It turns out to be a spring-kafka bug; it works with auto-commit enabled, but you have to explicitly enable it

spring.kafka.consumer.enable-auto-commit=true

Otherwise the container assumes it's false and causes the above strange behavior - looks like the client doesn't like calling the consumer's commit method if auto-commit is enabled. #288.

I would generally recommend setting to false, and choosing one of the container's AckModes instead; e.g. RECORD commits after ever record, BATCH after every batch received by a poll (default).

like image 95
Gary Russell Avatar answered Nov 03 '22 09:11

Gary Russell