Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I had set a timeout for my consumers, when there are no new messages, the consumer will get closed.
Now, new messages arrive. But, there are not consumers alive to consume them.
How should I handle such scenarios? My consumers may consume all messages and get closed. What is the best way to keep them alive? Is there any way to invoke them automatically upon the arrival of new messages? What are the best practices for such scenarios?
The recommended way to handle these cases is to move message processing to another thread, which allows the consumer to continue calling poll while the processor is still working.
Increasing the number of partitions and the number of brokers in a cluster will lead to increased parallelism of message consumption, which in turn improves the throughput of a Kafka cluster; however, the time required to replicate data across replica sets will also increase.
Create a file named consumer1.py with the following python script. KafkaConsumer module is imported from the Kafka library to read data from Kafka. sys module is used here to terminate the script. The same hostname and port number of the producer are used in the script of the consumer to read data from Kafka.
Kafka supports dynamic controlling of consumption flows by using pause(Collection) and resume(Collection) to pause the consumption on the specified assigned partitions and resume the consumption on the specified paused partitions respectively in the future poll(long) calls.
Kafka Producer and Consumer in Python 1 Set up. We should have python installed on our machine for this tutorial. ... 2 Kafka Producer. Let us start creating our own Kafka Producer. ... 3 Kafka Consumer. As we are finished with creating Producer, let us now start building Consumer in python and see if that will be equally easy. 4 Conclusion. ...
kafka-python is a Python client for the Apache Kafka. It is designed to work much like the official Java client. kafka-python is recommended to use with newer versions (0.9+) of Kafka brokers. However, it is backwards compatible with previous versions (to 0.8.0).
The first reason is because the kafka-python library does not support idempotence, which was the only feature that required the library to provide ids for each message. Confluent Python is one of the best Python libraries for managing Kafka.
Best practices for working with consumers 3. If your consumers are running versions of Kafka older than 0.10, upgrade them. In version 0.8.x, consumers use Apache ZooKeeper for consumer group coordination, and a number of known bugs can result in long-running rebalances or even failures of the rebalance algorithm.
Why not just
import time
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-1',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['topicName'])
while True:
try:
message = consumer.poll(10.0)
if not message:
time.sleep(120) # Sleep for 2 minutes
if message.error():
print(f"Consumer error: {message.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except:
# Handle any exception here
...
finally:
consumer.close()
print("Goodbye")
I cannot comment on the requirement of "setting a timeout for consumers", but in most of the cases consumers are supposed to run "forever" and should also be added to consumer groups in a way that they are highly available.
Use a generator function
def consumableMessages(self):
self.kafka.subscribe(self.topic)
try:
for message in self.kafka:
yield message.value.decode("utf-8")
except KeyboardInterrupt:
self.kafka.close()
And then we can wait for messages:
for message in kafka.consumableMessages():
print(message)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With