Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Messages lost when Kafka nodes are restarted

Tags:

apache-kafka

I'm running a 3-node Kafka cluster on AWS.

Kafka version: 0.10.2.1
Zookeeper version: 3.4

While performing a few stability tests I've noticed that messages get lost when I take the leader node down.

These are the steps to reproduce the issue:

Create a topic with replication-factor 3 which should make the data available on all 3 nodes.:

~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --create --topic stackoverflow --replication-factor 3 --partitions 20
Created topic "stackoverflow".
~ $ docker run --rm -ti ches/kafka bin/kafka-topics.sh --zookeeper "10.2.31.10:2181,10.2.31.74:2181,10.2.31.138:2181" --describe --topic stackoverflow
Topic:stackoverflow    PartitionCount:20    ReplicationFactor:3    Configs:
    Topic: stackoverflow    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 1    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 2    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 3    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 4    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 5    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 6    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 7    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 8    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 9    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 10    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 11    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 12    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 13    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: stackoverflow    Partition: 14    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: stackoverflow    Partition: 15    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: stackoverflow    Partition: 16    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: stackoverflow    Partition: 17    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: stackoverflow    Partition: 18    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: stackoverflow    Partition: 19    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

Start producing on that topic with the following code:

import time
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'])

try:
    count = 0
    while True:
        producer.send('stackoverflow', 'message')
        producer.flush()
        count += 1
        time.sleep(1)
except KeyboardInterrupt:
    print "Sent %s messages." % count

At this point I kill one of the machines and wait until it returns to cluster.

When it's back I stop the producer and consume all the messages from that topic.

from kafka import KafkaConsumer

consumer = KafkaConsumer('stackoverflow',
                            bootstrap_servers=['10.2.31.10:9092' ,'10.2.31.74:9092' ,'10.2.31.138:9092'],
                            auto_offset_reset='earliest',
                            enable_auto_commit=False)
try:
    count = 0
    for message in consumer:
        count += 1
        print message
except KeyboardInterrupt:
    print "Received %s messages." % count

Two messages that have been sent are missing. The producer didn't return any errors.

kafka $ python producer.py
Sent 762 messages.

kafka $ python consumer.py
Received 760 messages.

I'm new to Kafka so I'd really appreciate any ideas for debugging this further. Or instructions for making the cluster more resilient.

Thanks for the help!

like image 594
Akarot Avatar asked Sep 11 '17 14:09

Akarot


2 Answers

I ran into exactly the same issue some time ago. During investigation I found interesting feature: flush() method returns after each message in the buffer is sent or request resulted in error, as stated in the documentation.

I mitigated it by:

  1. Disabling unclean.leader.election.enabled on brokers (if not set, it's true in kafka<0.11 and false in kafka>=0.11, so you need to set it to false on your 0.10.2)
  2. Chaging async producer (send & flush) to synchronous one: producer.send(...).get()
  3. Added parameter retries=5 to KafkaProducer init (to make producer survive broker shutdown).

Let me know if it works for you.

like image 156
Mariusz Avatar answered Nov 09 '22 18:11

Mariusz


In the end I figured the reason for lost messages was insufficient number of retries. After reading a few blog posts about highly-available kafka, I noticed that people are recommending really high values for the "retries" parameter.

In python that would be:

producer = KafkaProducer(bootstrap_servers=[...], retries=sys.maxint)

I performed my tests again confirmed that no messages were lost.

like image 38
Akarot Avatar answered Nov 09 '22 19:11

Akarot