I'm quite new to kafka (also to english...),I facing this issue and can not google any solutions.
I use spring-boot,spring-kafka support, I have installed kafka_2.11-0.10.1.1 on my local machine (with only one broker 0)
s1.then I create topic by
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic tracking
My consumer config: applitions.properties:
kafka.servers.bootstrap=localhost:9092
kafka.topic.tracking=tracking
kafka.group.id=trackingGroup
kafka.client.id=client-1
s2. Then i startup 3 consumers by change the 'kafka.client.id' and run spring-boot main class . on eclipse console, I can check the partitions assignment:
client-1: partitions assigned:[tracking-4, tracking-3]
client-2: partitions assigned:[tracking-2, tracking-1]
client-3: partitions assigned:[tracking-0]
s3. start the pruducer to send 20 messages to the topic, each start to consume message of specific partition
s4. I close the consume 1, kafka do the rebalancing automatically , new partitions assignment:
client-1: partitions assigned:[]
client-2: partitions assigned:[tracking-2,tracking-1, tracking-0]
client-3: partitions assigned:[tracking-4,tracking-3]
s5. I found message on partition 'tracking-3' is no consumed !!
issue can be reproduced every time , aways some messages in new assigned partitions loss, do u can any suggesstions? Please help me ,thanks
Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.
Kafka Rebalance happens when a new consumer is either added (joined) into the consumer group or removed (left). It becomes dramatic during application service deployment rollout, as multiple instances restarted at the same time, and rebalance latency significantly increasing.
In Kafka within a consumer group you can have max 1 consumer per partition.
By default, the rebalance timeout is fixed to 5 minutes which can be a very long period during which the increasing consumer-lag can become an issue.
I reproduced it; it looks like a problem in kafka itself (with auto.comit.enabled=true
) on the rebalance, kafka is reporting the "position" of the unread partitions (the offset of the <i>next record</i> that will be fetched (if a record with that offset exists)
) as the end of the partition.
In fact, when I use the kafka-consumer-groups tool, the offsets of the unread partitions are already at the "end". When I run it with just one consumer, while it is reading the first partition, I see...
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so43405009
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tracking 0 37 40 3 client1-8129bb3d-3a83-4c83-9128-3a2762ede758 /10.0.0.6 client1
tracking 1 40 40 0 client1-8129bb3d-3a83-4c83-9128-3a2762ede758 /10.0.0.6 client1
tracking 2 40 40 0 client1-8129bb3d-3a83-4c83-9128-3a2762ede758 /10.0.0.6 client1
tracking 3 40 40 0 client1-8129bb3d-3a83-4c83-9128-3a2762ede758 /10.0.0.6 client1
tracking 4 40 40 0 client1-8129bb3d-3a83-4c83-9128-3a2762ede758 /10.0.0.6 client1
Notice the CURRENT_OFFSET column.
On the next run, I ran it twice, once while the first partition is being processed, and again a bit later...
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tracking 0 41 44 3 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 1 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 2 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 3 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 4 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
and
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tracking 0 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 1 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 2 41 44 3 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 3 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
tracking 4 44 44 0 client1-56d78a4b-f2df-4e31-8c5d-f8a1d8f64cf8 /10.0.0.6 client1
See how the current offset of partition 2 went down from 44 to 41.
Disabling auto-commit solved it for me...
spring.kafka.consumer.enable-auto-commit=false
...
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tracking 0 52 52 0 client1-59413599-81e8-49dd-bbd7-8a62152f11e5 /10.0.0.6 client1
tracking 1 49 52 3 client1-59413599-81e8-49dd-bbd7-8a62152f11e5 /10.0.0.6 client1
tracking 2 49 52 3 client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42 /10.0.0.6 client2
tracking 3 48 52 4 client2-edfe34f9-08d5-4825-80d0-4a6cf9526e42 /10.0.0.6 client2
tracking 4 51 52 1 client3-20da8742-af38-403e-b125-5d0c7c771319 /10.0.0.6 client3
Here is my test program:
@SpringBootApplication
public class So43405009Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43405009Application.class, args);
}
@Autowired
private KafkaTemplate<String, String> template;
@Value("${spring.kafka.consumer.client-id}")
private String clientId;
@Override
public void run(String... args) throws Exception {
if (this.clientId.endsWith("1")) {
for (int i = 0; i < 20; i++) {
this.template.sendDefault("foo" + i);
}
}
}
@Bean
public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("tracking");
containerProperties.setMessageListener((MessageListener<?, ?>) d -> {
System.out.println(d);
try {
Thread.sleep(5_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(cf,
containerProperties);
return container;
}
}
with properties
spring.kafka.listener.ack-mode=record
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so43405009
spring.kafka.consumer.client-id=client1
spring.kafka.template.default-topic=tracking
I see the same results with 0.10.2.0 as well.
EDIT
It turns out to be a spring-kafka bug; it works with auto-commit enabled, but you have to explicitly enable it
spring.kafka.consumer.enable-auto-commit=true
Otherwise the container assumes it's false
and causes the above strange behavior - looks like the client doesn't like calling the consumer's commit method if auto-commit is enabled. #288.
I would generally recommend setting to false, and choosing one of the container's AckMode
s instead; e.g. RECORD
commits after ever record, BATCH
after every batch received by a poll (default).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With