Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-python: producer is not able to connect

kafka-python (1.0.0) throws error while connecting to the broker. At the same time /usr/bin/kafka-console-producer and /usr/bin/kafka-console-consumer work fine.

Python application used to work well also, but after zookeeper restart, it no longer can connect.

I am using bare bones example from the docs:

from kafka import KafkaProducer
from kafka.common import KafkaError

producer = KafkaProducer(bootstrap_servers=['hostname:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

I am getting this error:

Traceback (most recent call last):   File "pp.py", line 4, in <module>
    producer = KafkaProducer(bootstrap_servers=['hostname:9092'])   File "/usr/lib/python2.6/site-packages/kafka/producer/kafka.py", line 246, in __init__
    self.config['api_version'] = client.check_version()   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 629, in check_version
    connect(node_id)   File "/usr/lib/python2.6/site-packages/kafka/client_async.py", line 592, in connect
    raise Errors.NodeNotReadyError(node_id) kafka.common.NodeNotReadyError: 0 Exception AttributeError: "'KafkaProducer' object has no attribute '_closed'" in <bound method KafkaProducer.__del__ of <kafka.producer.kafka.KafkaProducer object at 0x7f6171294c50>> ignored

When stepping through ( /usr/lib/python2.6/site-packages/kafka/client_async.py) I noticed that line 270 evaluates as false:

270         if not self._metadata_refresh_in_progress and not self.cluster.ttl() == 0:
271             if self._can_send_request(node_id):
272                 return True
273         return False

In my case self._metadata_refresh_in_progress is False, but the ttl() = 0;

At the same time kafka-console-* are happily pushing messages around:

/usr/bin/kafka-console-producer --broker-list hostname:9092 --topic test-topic
hello again
hello2

Any advice?

like image 599
alex_123 Avatar asked Feb 28 '16 22:02

alex_123


2 Answers

I had the same problem, and none of the solutions above worked. Then I read the exception messages and it seems it's mandatory to specify api_version, so

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,1,0))

note: tuple (1,0,0) matching to kafka version 1.0.0

works fine (at least completes without exceptions, now have to convince it to accept messages ;) )

like image 81
Egor Kraev Avatar answered Oct 02 '22 23:10

Egor Kraev


I had a similar problem. In my case, broker hostname was unresolvable on the client side . Try to explicitly set advertised.host.name in the configuration file.

like image 43
user3503929 Avatar answered Oct 03 '22 00:10

user3503929