I'm writing an application to perform a bunch of operations on a kafka topic through the Java API. I'm able to create topic and add partitions. I need help with fetching topic metadata (such as partitions, brokers) and configurations and updating the configurations.
For reference, I want to update the Topic Level configurations provided here -https://kafka.apache.org/documentation#configuration such as cleanup.policy, compression.type etc
You could use code below to print the topic-level configs. Usage of updating config is similar.
String[] args = {"--zookeeper", "localhost:2181", "--entity-type", "topics", "--entity-name", "test", "--describe"};
ConfigCommand.main(args);
As for getting metadata, please refer to Finding the Lead Broker for a Topic and Partition in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example:
ADDED: Add config fetch&update example using AdminUtils:
ZkUtils zkUtils = ZkUtils.apply("localhost:2181/k1", 6000, 10000, JaasUtils.isZkSecurityEnabled());
Properties pp = new Properties();
pp.setProperty("delete.retention.ms", "3000000");
pp.setProperty("file.delete.delay.ms", "40000");
AdminUtils.changeTopicConfig(zkUtils, "test", pp);
Properties p = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");
System.out.println(p);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With