Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How Can we create a topic in Kafka from the IDE using API

How Can we create a topic in Kafka from the IDE using API because when I do this:

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181 

I get the error:

bash: bin/kafka-create-topic.sh: No such file or directory 

And I followed the developer setup as it is.

like image 596
ramu Avatar asked Jun 05 '13 17:06

ramu


People also ask

Does Kafka have an API?

Kafka has four core APIs: The Producer API allows an application to publish a stream of records to one or more Kafka topics. The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.

What is Kafka in API?

The Kafka Streams API to implement stream processing applications and microservices. It provides higher-level functions to process event streams, including transformations, stateful operations like aggregations and joins, windowing, processing based on event-time, and more.

Which client API is used for managing Kafka objects?

Kafka APIs The Admin API to manage and inspect topics, brokers, and other Kafka objects. The Producer API to publish (write) a stream of events to one or more Kafka topics.


1 Answers

In Kafka 0.8.1+ -- the latest version of Kafka as of today -- you can programmatically create a new topic via AdminCommand. The functionality of CreateTopicCommand (part of the older Kafka 0.8.0) that was mentioned in one of the previous answers to this question was moved to AdminCommand.

Scala example for Kafka 0.8.1:

import kafka.admin.AdminUtils import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient.ZkClient  // Create a ZooKeeper client val sessionTimeoutMs = 10000 val connectionTimeoutMs = 10000 // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then // createTopic() will only seem to work (it will return without error).  The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the // topic. val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,     ZKStringSerializer)  // Create a topic named "myTopic" with 8 partitions and a replication factor of 3 val topicName = "myTopic" val numPartitions = 8 val replicationFactor = 3 val topicConfig = new Properties AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig) 

Build dependencies, using sbt as example:

libraryDependencies ++= Seq(   "com.101tec" % "zkclient" % "0.4",   "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"     exclude("javax.jms", "jms")     exclude("com.sun.jdmk", "jmxtools")     exclude("com.sun.jmx", "jmxri"),   ... ) 

EDIT: Added Java example for Kafka 0.9.0.0 (latest version as of Jan 2016).

Maven dependencies:

<dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka_2.11</artifactId>     <version>0.9.0.0</version> </dependency> <dependency>     <groupId>com.101tec</groupId>     <artifactId>zkclient</artifactId>     <version>0.7</version> </dependency> 

Code:

import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection;  import java.util.Properties;  import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils;  public class KafkaJavaExample {    public static void main(String[] args) {     String zookeeperConnect = "zkserver1:2181,zkserver2:2181";     int sessionTimeoutMs = 10 * 1000;     int connectionTimeoutMs = 8 * 1000;     // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then     // createTopic() will only seem to work (it will return without error).  The topic will exist in     // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the     // topic.     ZkClient zkClient = new ZkClient(         zookeeperConnect,         sessionTimeoutMs,         connectionTimeoutMs,         ZKStringSerializer$.MODULE$);      // Security for Kafka was added in Kafka 0.9.0.0     boolean isSecureKafkaCluster = false;     ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);      String topic = "my-topic";     int partitions = 2;     int replication = 3;     Properties topicConfig = new Properties(); // add per-topic configurations settings here     AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);     zkClient.close();   }  } 

EDIT 2: Added Java example for Kafka 0.10.2.0 (latest version as of April 2017).

Maven dependencies:

<dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka_2.11</artifactId>     <version>0.10.2.0</version> </dependency> <dependency>     <groupId>com.101tec</groupId>     <artifactId>zkclient</artifactId>     <version>0.9</version> </dependency> 

Code:

import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection;  import java.util.Properties;  import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils;  public class KafkaJavaExample {    public static void main(String[] args) {     String zookeeperConnect = "zkserver1:2181,zkserver2:2181";     int sessionTimeoutMs = 10 * 1000;     int connectionTimeoutMs = 8 * 1000;      String topic = "my-topic";     int partitions = 2;     int replication = 3;     Properties topicConfig = new Properties(); // add per-topic configurations settings here      // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then     // createTopic() will only seem to work (it will return without error).  The topic will exist in     // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the     // topic.     ZkClient zkClient = new ZkClient(         zookeeperConnect,         sessionTimeoutMs,         connectionTimeoutMs,         ZKStringSerializer$.MODULE$);      // Security for Kafka was added in Kafka 0.9.0.0     boolean isSecureKafkaCluster = false;      ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);     AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);     zkClient.close();   }  } 
like image 106
Michael G. Noll Avatar answered Sep 22 '22 05:09

Michael G. Noll