i have already started to learn Kafka. Trying basic operations on it. I have stucked on a point which about the 'Brokers'.
My kafka is running but when i want to create a partition.
from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
traceback (most recent call last): File "", line 1, in File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 284, in init self._client = KafkaClient(metrics=self._metrics, **self.config) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in init self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Kafka consumers act as end-users or applications that retrieve data from Kafka servers inside which Kafka producers publish real-time messages. For effectively fetching real-time messages, Kafka consumers have to subscribe to the respective topics present inside the Kafka servers.
The Kafka producer is conceptually much simpler than the consumer since it has no need for group coordination. A producer partitioner maps each message to a topic partition, and the producer sends a produce request to the leader of that partition.
A Broker is a Kafka server that runs in a Kafka Cluster. Kafka Brokers form a cluster. The Kafka Cluster consists of many Kafka Brokers on many servers. Broker sometimes refer to more of a logical system or as Kafka as a whole.
I had the same error during kafka streaming. The code below resolved my error: We need to define the API version in KafkaProducer.
KafkaProducer(bootstrap_servers=['localhost:9092'],
api_version=(0,11,5),
value_serializer=lambda x: dumps(x).encode('utf-8'))
The problem for me, was the firewall rule as I am running Kafka on Google Cloud.
It was working for me yesterday and I was scratching my head today for 1 hour thinking about why it doesn't work anymore .
As the public IP address of my local system changes every time I connect to a different LAN or WiFi, I had to allow my local system's public IP in the firewall rules. I would suggest using a connection with a fixed public IP or to check for this whenever you switch/change your connection.
These small changes in the configurations take too much to debug and fix them. Felt like wasted an hour for this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With