kafka-python (1.0.0) throws error while connecting to the broker. At the same time /usr/bin/kafka-console-producer and /usr/bin/kafka-console-consumer work fine.
Python application used to work well also, but after zookeeper restart, it no longer can connect.
I am using bare bones example from the docs:
from kafka import KafkaProducer
from kafka.common import KafkaError
producer = KafkaProducer(bootstrap_servers=['hostname:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
I am getting this error:
Traceback (most recent call last): File "pp.py", line 4, in <module>
producer = KafkaProducer(bootstrap_servers=['hostname:9092']) File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
self.config['api_version'] = client.check_version() File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
connect(node_id) File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored
When stepping through ( /usr/lib/python2.6/site-packages/kafka/client_async.py) I noticed that line 270 evaluates as false:
270 if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271 if self._can_send_request(node_id):
272 return True
273 return False
In my case self._metadata_refresh_in_progress is False, but the ttl() = 0;
At the same time kafka-console-* are happily pushing messages around:
/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2
Any advice?
I had the same problem, and none of the solutions above worked. Then I read the exception messages and it seems it's mandatory to specify api_version, so
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))
note: tuple
(1,0,0)
matching to kafka version1.0.0
works fine (at least completes without exceptions, now have to convince it to accept messages ;) )
I had a similar problem. In my case, broker hostname was unresolvable on the client side . Try to explicitly set advertised.host.name
in the configuration file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With