I'm trying to consume messages from a Kafka topic. I'm using a wrapper around confluent_kafka
consumer. I need to check if connection is established before I start consuming messages.
I read that the consumer is lazy, so I need to perform some action for the connection to get established. But I want to check the connection establishment without doing a consume
or poll
operation.
Also, I tried giving some bad configurations to see what the response on a poll would be. The response I got was:
b'Broker: No more messages'
So, how do I decide if the connection parameters are faulty, the connection is broken, or there actually are no messages in the topic?
Use 'systemctl status kafka' to check the status.
If you are looking for the Kafka cluster broker status, you can use zookeeper cli to find the details for each broker as given below: ls /brokers/ids returns the list of active brokers IDs on the cluster. get /brokers/ids/<id> returns the details of the broker with the given ID.
6 Answers. Show activity on this post. You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.
During a broker outage, all partition replicas on the broker become unavailable, so the affected partitions' availability is determined by the existence and status of their other replicas. If a partition has no additional replicas, the partition becomes unavailable.
I am afraid there is no direct approach for testing whether Kafka Brokers are up and running. Also note that if your consumer has already consumed the messages it doesn't mean that this is a bad behaviour and obviously it does not indicate that the Kafka broker is down.
A possible workaround would be to perform some sort of quick operation and see if the broker responds. An example would be listing the topics:
Using confluent-kafka-python
and AdminClient
# Example using confuent_kafka
from confluent_kafka.admin import AdminClient
kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics
if not topics:
raise RuntimeError()
Using kafka-python
and KafkaConsumer
# example using kafka-python
import kafka
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()
if not topics:
raise RuntimeError()
Use kafka-python at your own risk, library has not been updated in years, might not be compatible with amazon msk or confluent containers
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With