How Can we create a topic in Kafka from the IDE using API because when I do this:
bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181
I get the error:
bash: bin/kafka-create-topic.sh: No such file or directory
And I followed the developer setup as it is.
Kafka has four core APIs: The Producer API allows an application to publish a stream of records to one or more Kafka topics. The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
The Kafka Streams API to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more.
Kafka APIs The Admin API to manage and inspect topics, brokers, and other Kafka objects. The Producer API to publish (write) a stream of events to one or more Kafka topics.
In Kafka 0.8.1+ -- the latest version of Kafka as of today -- you can programmatically create a new topic via AdminCommand
. The functionality of CreateTopicCommand
(part of the older Kafka 0.8.0) that was mentioned in one of the previous answers to this question was moved to AdminCommand
.
Scala example for Kafka 0.8.1:
import kafka.admin.AdminUtils import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient.ZkClient // Create a ZooKeeper client val sessionTimeoutMs = 10000 val connectionTimeoutMs = 10000 // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer) // Create a topic named "myTopic" with 8 partitions and a replication factor of 3 val topicName = "myTopic" val numPartitions = 8 val replicationFactor = 3 val topicConfig = new Properties AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)
Build dependencies, using sbt as example:
libraryDependencies ++= Seq( "com.101tec" % "zkclient" % "0.4", "org.apache.kafka" % "kafka_2.10" % "0.8.1.1" exclude("javax.jms", "jms") exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri"), ... )
EDIT: Added Java example for Kafka 0.9.0.0 (latest version as of Jan 2016).
Maven dependencies:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.7</version> </dependency>
Code:
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import java.util.Properties; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class KafkaJavaExample { public static void main(String[] args) { String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 8 * 1000; // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. ZkClient zkClient = new ZkClient( zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); // Security for Kafka was added in Kafka 0.9.0.0 boolean isSecureKafkaCluster = false; ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); String topic = "my-topic"; int partitions = 2; int replication = 3; Properties topicConfig = new Properties(); // add per-topic configurations settings here AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig); zkClient.close(); } }
EDIT 2: Added Java example for Kafka 0.10.2.0 (latest version as of April 2017).
Maven dependencies:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.9</version> </dependency>
Code:
import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import java.util.Properties; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; public class KafkaJavaExample { public static void main(String[] args) { String zookeeperConnect = "zkserver1:2181,zkserver2:2181"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 8 * 1000; String topic = "my-topic"; int partitions = 2; int replication = 3; Properties topicConfig = new Properties(); // add per-topic configurations settings here // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. ZkClient zkClient = new ZkClient( zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); // Security for Kafka was added in Kafka 0.9.0.0 boolean isSecureKafkaCluster = false; ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster); AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); zkClient.close(); } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With