I want to create a topic in Kafka (kafka_2.8.0-0.8.1.1) through java. It is working fine if I create a topic in command prompt, and If I push message through java api. But I want to create a topic through java api. After a long search I found below code,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
I tried above code and it is showing that topic is created but I am not able to push message in the topic. Any thing wrong in my code? Or any other way to achieve the above?
Kafka topics are partitioned, which distributes data across multiple brokers for scalability. They can be replicated in order to make the data fault-tolerant and highly available. Topics also retain events even after consumption for as long as required.
To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.
Create a Kafka Topic Don't worry about them right now - they are used to control specific aspects related to distributed systems in Kafka. As you are running a simple setup, you can specify “1” for both parameters. Now that you have everything up and running, you can start integrating Kafka with a Java application!
Edit - Zookeeper is not required in newer version of Kafka. Please see answer by @Neeleshkumar Srinivasan Mannur for API version 0.11.0+
I fixed it.. After a long research..
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
From the above code, ZkClient will create a topic but this topic information will not have awareness for the kafka. So what we have to do is, we need to create object for ZkClient in following way,
First import the below statement,
import kafka.utils.ZKStringSerializer$;
and create object for ZkClient in the following way,
ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000, ZKStringSerializer$.MODULE$); AdminUtils.createTopic(zkClient, myTopic, 10, 1, new Properties());
The above code won't work for kafka > 0.9 since the api has been changed, Use the below code for kafka > 0.9
import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; public class KafkaTopicCreationInJava { public static void main(String[] args) throws Exception { ZkClient zkClient = null; ZkUtils zkUtils = null; try { String zookeeperHosts = "192.168.20.1:2181"; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; int sessionTimeOutInMs = 15 * 1000; // 15 secs int connectionTimeOutInMs = 10 * 1000; // 10 secs zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); String topicName = "testTopic"; int noOfPartitions = 2; int noOfReplication = 3; Properties topicConfiguration = new Properties(); AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration); } catch (Exception ex) { ex.printStackTrace(); } finally { if (zkClient != null) { zkClient.close(); } } } }
The process seems to be pretty much simplified in API 0.11.0+. Using that, it can be done as follows
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; Properties properties = new Properties(); properties.load(new FileReader(new File("kafka.properties"))); AdminClient adminClient = AdminClient.create(properties); NewTopic newTopic = new NewTopic("topicName", 1, (short)1); //new NewTopic(topicName, numPartitions, replicationFactor) List<NewTopic> newTopics = new ArrayList<NewTopic>(); newTopics.add(newTopic); adminClient.createTopics(newTopics); adminClient.close();
The contents of kafka.properties
file are as follows
bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true auto.commit.interval.ms=1000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Note that the instance of the AdminClient must be closed in order to reflect the newly created topic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With