I'm running a 3-node Kafka cluster on AWS.
Kafka version: 0.10.2.1
Zookeeper version: 3.4
While performing a few stability tests I've noticed that messages get lost when I take the leader node down.
These are the steps to reproduce the issue:
Create a topic with replication-factor 3 which should make the data available on all 3 nodes.:
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --describe --topic stackoverflow
Topic:stackoverflow PartitionCount:20 ReplicationFactor:3 Configs:
Topic: stackoverflow Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: stackoverflow Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: stackoverflow Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: stackoverflow Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: stackoverflow Partition: 4 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: stackoverflow Partition: 5 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: stackoverflow Partition: 6 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: stackoverflow Partition: 7 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: stackoverflow Partition: 8 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: stackoverflow Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: stackoverflow Partition: 10 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: stackoverflow Partition: 11 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: stackoverflow Partition: 12 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: stackoverflow Partition: 13 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: stackoverflow Partition: 14 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: stackoverflow Partition: 15 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: stackoverflow Partition: 16 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: stackoverflow Partition: 17 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: stackoverflow Partition: 18 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: stackoverflow Partition: 19 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Start producing on that topic with the following code:
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'])
try:
count = 0
while True:
producer.send('stackoverflow', 'message')
producer.flush()
count += 1
time.sleep(1)
except KeyboardInterrupt:
print "Sent %s messages." % count
At this point I kill one of the machines and wait until it returns to cluster.
When it's back I stop the producer and consume all the messages from that topic.
from kafka import KafkaConsumer
consumer = KafkaConsumer('stackoverflow',
bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False)
try:
count = 0
for message in consumer:
count += 1
print message
except KeyboardInterrupt:
print "Received %s messages." % count
Two messages that have been sent are missing. The producer didn't return any errors.
kafka $ python producer.py
Sent 762 messages.
kafka $ python consumer.py
Received 760 messages.
I'm new to Kafka so I'd really appreciate any ideas for debugging this further. Or instructions for making the cluster more resilient.
Thanks for the help!
I ran into exactly the same issue some time ago. During investigation I found interesting feature: flush()
method returns after each message in the buffer is sent or request resulted in error, as stated in the documentation.
I mitigated it by:
unclean.leader.election.enabled
on brokers (if not set, it's true
in kafka<0.11 and false
in kafka>=0.11, so you need to set it to false on your 0.10.2)producer.send(...).get()
retries=5
to KafkaProducer init (to make producer survive broker shutdown).Let me know if it works for you.
In the end I figured the reason for lost messages was insufficient number of retries. After reading a few blog posts about highly-available kafka, I noticed that people are recommending really high values for the "retries" parameter.
In python that would be:
producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)
I performed my tests again confirmed that no messages were lost.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With