Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to programmatically check if Kafka Broker is up and running in Python

I'm trying to consume messages from a Kafka topic. I'm using a wrapper around confluent_kafka consumer. I need to check if connection is established before I start consuming messages.

I read that the consumer is lazy, so I need to perform some action for the connection to get established. But I want to check the connection establishment without doing a consume or poll operation.

Also, I tried giving some bad configurations to see what the response on a poll would be. The response I got was:

b'Broker: No more messages'

So, how do I decide if the connection parameters are faulty, the connection is broken, or there actually are no messages in the topic?

like image 621
ghost Avatar asked Apr 15 '20 10:04

ghost


People also ask

How do you check Kafka is up and running?

Use 'systemctl status kafka' to check the status.

How do I check my Kafka broker status?

If you are looking for the Kafka cluster broker status, you can use zookeeper cli to find the details for each broker as given below: ls /brokers/ids returns the list of active brokers IDs on the cluster. get /brokers/ids/<id> returns the details of the broker with the given ID.

How do I know if Kafka consumer is running?

6 Answers. Show activity on this post. You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

What happens if Kafka broker is down?

During a broker outage, all partition replicas on the broker become unavailable, so the affected partitions' availability is determined by the existence and status of their other replicas. If a partition has no additional replicas, the partition becomes unavailable.


1 Answers

I am afraid there is no direct approach for testing whether Kafka Brokers are up and running. Also note that if your consumer has already consumed the messages it doesn't mean that this is a bad behaviour and obviously it does not indicate that the Kafka broker is down.


A possible workaround would be to perform some sort of quick operation and see if the broker responds. An example would be listing the topics:

Using confluent-kafka-python and AdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()

Using kafka-python and KafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()

Use kafka-python at your own risk, library has not been updated in years, might not be compatible with amazon msk or confluent containers

like image 105
Giorgos Myrianthous Avatar answered Oct 11 '22 02:10

Giorgos Myrianthous