Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka: Get broker host from ZooKeeper

For particular reasons I need to use both - ConsumerGroup (a.k.a. high-level consumer) and SimpleConsumer (a.k.a. low-level consumer) to read from Kafka. For ConsumerGroup I use ZooKeeper-based config and am completely satisfied with it, but SimpleConsumer requires seed brokers to be instantiated.

I don't want to keep list of both - ZooKeeper and broker hosts. Thus, I'm looking for a way to automatically discover brokers for a particular topic from ZooKeeper.

Because of some indirect information I belief that these data is stored in ZooKeeper under one of the following paths:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /brokers/ids/

However, when I try to read data from these nodes, I'm getting serialization error (I'm using com.101tec.zkclient for this):

org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more

I can write and read custom Java objects (e.g. Strings) without any problem, so I believe it's not a problem of a client, but rather tricky encoding. Thus, I want to know:

  1. If this is the right way to go, how to read these nodes properly?
  2. If the whole approach is wrong, what is the right one?
like image 734
ffriend Avatar asked Apr 07 '15 11:04

ffriend


People also ask

How do I get broker ID in Kafka?

you can use kafka-manager, an open-source tool powered by yahoo. Show activity on this post. This will list all of the brokers with their id and the beginning offset.

How do I check my Kafka broker port?

Answer : To find the Kafka port number - locate the Kafka server. properties file. Typically the server. properties file will have the information required.


3 Answers

That is the way of what one of my colleagues did to get a list of Kafka brokers. I think it's a correct way when you want to get a broker list dynamically.

Here is an example code that shows how to get the list.

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

Running the code onto the cluster consisting of three brokers results in

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
like image 163
Heejin Avatar answered Oct 23 '22 05:10

Heejin


It turns out that Kafka uses ZKStringSerializer to read and write data into znodes. So, to fix the error I only had to add it as a last parameter in ZkClient constructor:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

Using it, I wrote several useful functions for discovering broker ids, their addresses and other stuff:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException


def listBrokers(): List[Int] = {
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}

def listTopics(): List[String] = {
  zkClient.getChildren("/brokers/topics").toList
}

def listPartitions(topic: String): List[Int] = {
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) {
    zkClient.getChildren(path).toList.map(_.toInt)
  } else {
    throw new KafkaException(s"Topic ${topic} doesn't exist")
  }
}

def getBrokerAddress(brokerId: Int): (String, Int) = {
  val path = s"/brokers/ids/${brokerId}"
  if (zkClient.exists(path)) {
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
  } else {
    throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
  }
}

def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
  val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
  if (zkClient.exists(path)) {
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
  } else {
    throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
  }
}
like image 33
ffriend Avatar answered Oct 23 '22 07:10

ffriend


To do this using the shell:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0 
like image 9
phayes Avatar answered Oct 23 '22 07:10

phayes