Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-python consumer start reading from offset (automatically)

I'm trying to build an application with kafka-python where a consumer reads data from a range of topics. It is extremely important that the consumer never reads the same message twice, but also never misses a message.

Everything seems to be working fine, except when I turn off the consumer (e.g. failure) and try to start reading from offset. I can only read all the messages from the topic (which creates double reads) or listen for new messages only (and miss messages that where emitted during the breakdown). I don't encounter this problem when pausing the consumer.

I created an isolated simulation in order to try to solve the problem.

Here the generic producer:

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

x=0 # set manually to avoid duplicates 

for e in range(1000):
    if e <= x:
        pass
    else:
        data = dumps(
            {
            'number' : e
        }
        ).encode('utf-8')

        producer.send('numtest', value=data)
        print(e, ' send.')

        sleep(5)

And the consumer. If auto_offset_reset is set to 'earliest', all the messages will be read again. If auto_offset_reset is set to 'latest', no messages during down-time will be read.

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000)


## Connect to database
client = MongoClient('localhost:27017')
collection = client.counttest.counttest

# Send data
for message in consumer:
    message = loads(message.value.decode('utf-8'))
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))

I feel like the auto-commit isn't working properly.

I know that this questions is similar to this one, but I would like a specific solution.

Thanks for helping me out.

like image 979
Steven Van Dorpe Avatar asked Aug 11 '18 11:08

Steven Van Dorpe


People also ask

How Kafka consumer maintains offset?

As each message is received by Kafka, it allocates a message ID to the message. Kafka then maintains the message ID offset on a by consumer and by partition basis to track consumption. Kafka brokers keep track of both what is sent to the consumer and what is acknowledged by the consumer by using two offset values.

How does Kafka consumer auto commit work?

Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.

How do I set offset in Kafka consumer offset?

How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.


2 Answers

You are getting this behavior because your consumer is not using a Consumer Group. With a Consumer Group, the consumer will regularly commit (save) its position to Kafka. That way if it's restarted it will pick up from its last committed position.

To make your consumer use a Consumer Group, you need to set group_id when constructing it. See group_id description from the docs:

The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None

For example:

consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000, group_id='my-group')
like image 172
Mickael Maison Avatar answered Oct 23 '22 01:10

Mickael Maison


Is that possible to using consumer from different server. I already tried the same below is the code and its not fetching any data from kafka.

consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
                     auto_offset_reset='earliest', enable_auto_commit=True,
                     auto_commit_interval_ms=1000, group_id=None)

Note:- When I am giving wrong ip or port number its throws exceptions.

like image 35
Harsh Avatar answered Oct 11 '22 21:10

Harsh