Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a Topic in Kafka through Java

I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?

like image 442
Jaya Ananthram Avatar asked Nov 20 '14 10:11

Jaya Ananthram


People also ask

What is Kafka topic in Java?

Kafka topics are partitioned, which distributes data across multiple brokers for scalability. They can be replicated in order to make the data fault-tolerant and highly available. Topics also retain events even after consumption for as long as required.

How do I create a topic in spring Kafka?

To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.

Can Kafka be used with Java?

Create a Kafka Topic Don't worry about them right now - they are used to control specific aspects related to distributed systems in Kafka. As you are running a simple setup, you can specify “1” for both parameters. Now that you have everything up and running, you can start integrating Kafka with a Java application!


2 Answers

Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+



Original answer

I fixed it.. After a long research..

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,

First import the below statement,

import kafka.utils.ZKStringSerializer$; 

and create object for ZkClient in the following way,

ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties()); 

Edit 1: (for @ajkret comment)

The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9


import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection;  public class KafkaTopicCreationInJava {     public static void main(String[] args) throws Exception {         ZkClient zkClient = null;         ZkUtils zkUtils = null;         try {             String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";             int sessionTimeOutInMs = 15 * 1000; // 15 secs             int connectionTimeOutInMs = 10 * 1000; // 10 secs              zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);             zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);              String topicName = "testTopic";             int noOfPartitions = 2;             int noOfReplication = 3;             Properties topicConfiguration = new Properties();              AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);          } catch (Exception ex) {             ex.printStackTrace();         } finally {             if (zkClient != null) {                 zkClient.close();             }         }     } } 
like image 60
Jaya Ananthram Avatar answered Sep 29 '22 18:09

Jaya Ananthram


The process seems to be pretty much simplified in API 0.11.0+. Using that, it can be done as follows

import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic;  Properties properties = new Properties(); properties.load(new FileReader(new File("kafka.properties")));  AdminClient adminClient = AdminClient.create(properties); NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor)  List<NewTopic> newTopics = new ArrayList<NewTopic>(); newTopics.add(newTopic);  adminClient.createTopics(newTopics); adminClient.close(); 

The contents of kafka.properties file are as follows

bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true auto.commit.interval.ms=1000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

Note that the instance of the AdminClient must be closed in order to reflect the newly created topic.

like image 23
Neeleshkumar S Avatar answered Sep 29 '22 17:09

Neeleshkumar S