Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka not receiving messages when indicating group_id in Python

I am using Kafka (kafka-python) version 3.0.0-1.3.0.0.p0.40. I need to configure the consumer for the topic 'simulation' in Python. When I don't indicate the group_id, i.e. group_id = None it receives messages fine. However if I indicate the group_id, it doesn't receive any messages.

Here's my code in Python:

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                         group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)

I tried to search for some default values of group_id in consumer properties files, I've found one cloudera_mirrormaker however nothing changed. I will need to use multiple consumers therefore it's important that I have a group_id and they share the same group_id. In many sources I've found that the group_id can be any string...

When I run the consumer for this topic in the console it works and receives messages

./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID  --partition 0

when I'm running kafka-consumer-groups.sh to list all available groups it's empty.

If anyone has an idea why it's stuck in Python, it would be so much appreciated. Thanks a lot

Here is code for producer (I've reduced it for simplicity as in this case it doesn't change the problem)

from kafka import KafkaProducer
class Producer(threading.Thread):
    ...
    def run(self):
        producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
        while not self.stop_event.is_set():
            string = 'test %s' %time.time()
            producer.send('simulation', string.encode())
            time.sleep(0.5)
        producer.close()
like image 899
Alexander Komarov Avatar asked Nov 07 '22 02:11

Alexander Komarov


1 Answers

I've finally solved it.

That was my issue: omkafka config file partitions.number attr was 1 by default.

We changed it to 100 as was needed and it started working! I hope it will help you

like image 185
Alexander Komarov Avatar answered Nov 14 '22 21:11

Alexander Komarov