Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Check for the existence of a Kafka topic programatically in Java

How can I know if a topic has been created in a Kafka cluster, programatically, without using CLI tools, and before trying to produce into the topic?

I'm running into a problem where the topic doesn't exist, and our application is trying to produce to a non-existent topic, but it's only notified after 90 seconds (metadata timeout). I'd like to know if there is a way to know, if the topic exists or not, from within Java code, so that we can check that before actually attempting to send the message. I guess I could look at the code that the Kafka CLI utils uses, but I was wondering if maybe there's an API or an easier way that I may have missed.

like image 878
mjuarez Avatar asked Oct 23 '18 23:10

mjuarez


2 Answers

You could use AdminClient#listTopics() to check if a given topic exists, as shown below:

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient client = AdminClient.create(props)) {
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true); // includes internal topics such as __consumer_offsets
    ListTopicsResult topics = client.listTopics(options);
    Set<String> currentTopicList = topics.names().get();
    // do your filter logic here......
}
like image 147
amethystic Avatar answered Oct 12 '22 14:10

amethystic


You can use AdminUtils.topicExists(..) method for older kafka versions (1.0.0) to check if a topic exists or not :

    int sessionTimeOutInMs = 15 * 1000;
    int connectionTimeOutInMs = 10 * 1000;
    String zkHost = "localhost:2181";
    ZkClient zkClient = new ZkClient(zkHost, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHost), false);
    System.out.println(AdminUtils.topicExists(zkUtils, "TopicName"));

AdminUtils is deprecated in recent Kafka versions. So you can use AdminClient for kafka 1.0 + :

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", "localhost:9092");
    AdminClient admin = AdminClient.create(prop);
    boolean topicExists = admin.listTopics().names().get().stream().anyMatch(topicName -> topicName.equalsIgnoreCase("tealium.topic"));
like image 31
Nishu Tayal Avatar answered Oct 12 '22 14:10

Nishu Tayal