Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PyKafka metadata in bytes instead of strings

I see an unusual behaviour with PyKafka, a client that I just recently began to use.

The error is the following:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}

The source of the error is in this lines:

    self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )

Debugging I saw that the client use the correct string IP to connect to the seed broker but when the list of brokers is retrieved, their IP are in binary and when PyKafka tries to connect again to create a consumer, these IPs obviously don't work.

Another problem, possibly connected, is that I need to convert topic names and consumer groups names to bytes by myself (like with other clients) but all the examples in the docs show the usage of strings.

Kafka broker version: 0.10.1.0 PyKafka version: 2.7.0

like image 258
Chobeat Avatar asked May 11 '18 15:05

Chobeat


2 Answers

Ok, I was completely misled: that wasn't an IP but an hostname in base64 (generated by Docker).

like image 191
Chobeat Avatar answered Oct 16 '22 11:10

Chobeat


Check your brokers' advertised.listeners config - it defines the hostnames that will be sent to ZooKeeper and onward to pykafka clients during pykafka's Cluster initialization. It's possible that Docker is corrupting this information, so you'll want to override it using advertised.listeners. From the documentation:

Listeners to publish to ZooKeeper for clients to use, if different than the listeners config property. In IaaS environments, this may need to be different from the interface to which the broker binds.

As for the bytes/string issue, the latest development release of pykafka accepts strings or bytes for topic and consumer group names as a programmer convenience. For older versions, you will need to convert string arguments to bytes using a technique like this:

topic_name = str_topic_name.encode('ascii')
like image 37
Emmett Butler Avatar answered Oct 16 '22 12:10

Emmett Butler