Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best practice for keeping Kafka consumer alive in python?

Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I had set a timeout for my consumers, when there are no new messages, the consumer will get closed.

Now, new messages arrive. But, there are not consumers alive to consume them.

How should I handle such scenarios? My consumers may consume all messages and get closed. What is the best way to keep them alive? Is there any way to invoke them automatically upon the arrival of new messages? What are the best practices for such scenarios?

like image 901
Amir Afianian Avatar asked May 23 '20 07:05

Amir Afianian


People also ask

How do I keep Kafka consumer alive?

The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working.

How can I make Kafka consumer faster?

Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.

How use Kafka consumer in python?

Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.

Can we pause Kafka consumer?

Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.

How to use Kafka producer and consumer in Python?

Kafka Producer and Consumer in Python 1 Set up. We should have python installed on our machine for this tutorial. ... 2 Kafka Producer. Let us start creating our own Kafka Producer. ... 3 Kafka Consumer. As we are finished with creating Producer, let us now start building Consumer in python and see if that will be equally easy. 4 Conclusion. ...

What is Kafka-Python?

kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client. kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0).

Why doesn't Kafka-Python support idempotence?

The first reason is because the kafka-python library does not support idempotence, which was the only feature that required the library to provide ids for each message. Confluent Python is one of the best Python libraries for managing Kafka.

What are the best practices for working with Kafka consumers?

Best practices for working with consumers 3. If your consumers are running versions of Kafka older than 0.10, upgrade them. In version 0.8.x, consumers use Apache ZooKeeper for consumer group coordination, and a number of known bugs can result in long-running rebalances or even failures of the rebalance algorithm.


Video Answer


2 Answers

Why not just

import time
from confluent_kafka import Consumer


consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-1',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])

while True:
    try: 
        message = consumer.poll(10.0)

        if not message:
            time.sleep(120) # Sleep for 2 minutes

        if message.error():
            print(f"Consumer error: {message.error()}")
            continue

        print(f"Received message: {msg.value().decode('utf-8')}")
    except:
        # Handle any exception here
        ...
    finally:
        consumer.close()
        print("Goodbye")

I cannot comment on the requirement of "setting a timeout for consumers", but in most of the cases consumers are supposed to run "forever" and should also be added to consumer groups in a way that they are highly available.

like image 96
Giorgos Myrianthous Avatar answered Oct 08 '22 23:10

Giorgos Myrianthous


Use a generator function

def consumableMessages(self):
    self.kafka.subscribe(self.topic)
    try:
        for message in self.kafka:
            yield message.value.decode("utf-8")
    except KeyboardInterrupt:
        self.kafka.close()

And then we can wait for messages:

for message in kafka.consumableMessages():
    print(message)
like image 20
NotoriousPyro Avatar answered Oct 08 '22 21:10

NotoriousPyro