I need to configure retention policy of a particular topic during creation. I tried to look for solution i could only find command level alter command as below
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000
Can someone let me know a way to configure it during creation, something like xml or properties file configuration in spring-mvc.
Spring Kafka lets you create new topics by declaring @Bean
s in your application context. This will require a bean of type KafkaAdmin
in the application context, which will be created automatically if using Spring Boot. You could define your topic as follows:
@Bean
public NewTopic myTopic() {
return TopicBuilder.name("my-topic")
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
.build();
}
If you are not using Spring Boot, you'll additionally have to define the KafkaAdmin
bean:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
return new KafkaAdmin(configs);
}
If you want to edit the configuration of an existing topic, you'll have to use the AdminClient
, here's the snippet to change the retention.ms
at a topic level:
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient client = AdminClient.create(config);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));
AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));
AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
alterConfigsResult.all();
The configuration can be set up automatically using this @PostConstruct
method that takes in NewTopic
beans.
@Autowired
private Set<NewTopic> topics;
@PostConstruct
public void reconfigureTopics() throws ExecutionException, InterruptedException {
try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
adminClient.incrementalAlterConfigs(topics.stream()
.filter(topic -> topic.configs() != null)
.collect(Collectors.toMap(
topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
topic -> topic.configs().entrySet()
.stream()
.map(e -> new ConfigEntry(e.getKey(), e.getValue()))
.peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
.map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
.collect(Collectors.toList())
)))
.all()
.get();
}
}
I guess you could use admin client (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) for this. You can create Admin client instance in your application and use create or alter topic command for manipulating topic configurations, including retention.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With