Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Checking the existence of topic in kafka before creating in Java

I am trying to create a topic in kafka 0.8.2 by using :

AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);

If I run the code more than once locally for testing, this fails as the topic was already created. Is there a way to check if the topic exists before creating the topic? The TopicCommand api doesn't seem to return anything for listTopics or describeTopic .

like image 980
nishantv Avatar asked Jun 01 '15 11:06

nishantv


People also ask

How do you know when a Kafka consumer is ready?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.


Video Answer


2 Answers

You can use AdminClient from kakfa-client version 0.11.0.0

Sample code:

    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhist:9091");

    AdminClient admin = AdminClient.create(config);
    ListTopicsResult listTopics = admin.listTopics();
    Set<String> names = listTopics.names().get();
    boolean contains = names.contains("TEST_6");
    if (!contains) {
        List<NewTopic> topicList = new ArrayList<NewTopic>();
        Map<String, String> configs = new HashMap<String, String>();
        int partitions = 5;
        Short replication = 1;
        NewTopic newTopic = new NewTopic("TEST_6", partitions, replication).configs(configs);
        topicList.add(newTopic);
        admin.createTopics(topicList);
    }
like image 139
Raghava Reddy Avatar answered Sep 19 '22 13:09

Raghava Reddy


public static void createKafkaTopic(String sourceTopicName, String sinkTopicName, String responseTopicName, String kafkaUrl) {

    try {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
        ListTopicsResult topics = kafkaAdminClient.listTopics();
        Set <String> names = topics.names().get();

        boolean containsSourceTopic = names.contains(sourceTopicName);
        boolean containsSinkTopic = names.contains(sinkTopicName);
        boolean containsResponseTopic = names.contains(responseTopicName);

        if (!containsResponseTopic && !containsSinkTopic && !containsSourceTopic) {
            CreateTopicsResult result = kafkaAdminClient.createTopics(
                    Stream.of(sourceTopicName, sinkTopicName, responseTopicName).map(
                            name -> new NewTopic(name, 1, (short) 1)
                    ).collect(Collectors.toList())
            );
            result.all().get();
            LOG.info("new sourceTopicName: {}, sinkTopicName: {}, responseTopicName: {} are created",
                    sourceTopicName, sinkTopicName, responseTopicName);
        }
    } catch (ExecutionException | InterruptedException e) {
        LOG.info("Error message {}", e.getMessage());
    }
}
like image 40
Nhlamulo Socha Chauke Avatar answered Sep 21 '22 13:09

Nhlamulo Socha Chauke