I'm trying to build an application with kafka-python where a consumer reads data from a range of topics. It is extremely important that the consumer never reads the same message twice, but also never misses a message.
Everything seems to be working fine, except when I turn off the consumer (e.g. failure) and try to start reading from offset. I can only read all the messages from the topic (which creates double reads) or listen for new messages only (and miss messages that where emitted during the breakdown). I don't encounter this problem when pausing the consumer.
I created an isolated simulation in order to try to solve the problem.
Here the generic producer:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
x=0 # set manually to avoid duplicates
for e in range(1000):
if e <= x:
pass
else:
data = dumps(
{
'number' : e
}
).encode('utf-8')
producer.send('numtest', value=data)
print(e, ' send.')
sleep(5)
And the consumer. If auto_offset_reset
is set to 'earliest'
, all the messages will be read again. If auto_offset_reset
is set to 'latest'
, no messages during down-time will be read.
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads
## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000)
## Connect to database
client = MongoClient('localhost:27017')
collection = client.counttest.counttest
# Send data
for message in consumer:
message = loads(message.value.decode('utf-8'))
collection.insert_one(message)
print('{} added to {}'.format(message, collection))
I feel like the auto-commit isn't working properly.
I know that this questions is similar to this one, but I would like a specific solution.
Thanks for helping me out.
As each message is received by Kafka, it allocates a message ID to the message. Kafka then maintains the message ID offset on a by consumer and by partition basis to track consumption. Kafka brokers keep track of both what is sent to the consumer and what is acknowledged by the consumer by using two offset values.
Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property. If the consumer crashes, then after a restart or a rebalance, the position of all partitions owned by the crashed consumer will be reset to the last committed offset.
How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
You are getting this behavior because your consumer is not using a Consumer Group. With a Consumer Group, the consumer will regularly commit (save) its position to Kafka. That way if it's restarted it will pick up from its last committed position.
To make your consumer use a Consumer Group, you need to set group_id
when constructing it.
See group_id
description from the docs:
The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via group coordinator) and offset commits are disabled. Default: None
For example:
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id='my-group')
Is that possible to using consumer from different server. I already tried the same below is the code and its not fetching any data from kafka.
consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
auto_offset_reset='earliest', enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id=None)
Note:- When I am giving wrong ip or port number its throws exceptions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With