Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to configure kafka topic retention policy during creation with Spring?

I need to configure retention policy of a particular topic during creation. I tried to look for solution i could only find command level alter command as below

./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config retention.ms=1680000

Can someone let me know a way to configure it during creation, something like xml or properties file configuration in spring-mvc.

like image 576
Rachan R K Avatar asked Jun 11 '19 08:06

Rachan R K


Video Answer


2 Answers

Spring Kafka lets you create new topics by declaring @Beans in your application context. This will require a bean of type KafkaAdmin in the application context, which will be created automatically if using Spring Boot. You could define your topic as follows:

@Bean
public NewTopic myTopic() {
    return TopicBuilder.name("my-topic")
            .partitions(4)
            .replicas(3)
            .config(TopicConfig.RETENTION_MS_CONFIG, "1680000")
            .build();
}

If you are not using Spring Boot, you'll additionally have to define the KafkaAdmin bean:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    return new KafkaAdmin(configs);
}

If you want to edit the configuration of an existing topic, you'll have to use the AdminClient, here's the snippet to change the retention.ms at a topic level:

Map<String, Object> config = new HashMap<>();                
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
                         
AdminClient client = AdminClient.create(config);
                         
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "new-topic");
            
// Update the retention.ms value
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "1680000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singleton(retentionEntry)));

AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(1);
configs.put(resource, Arrays.asList(op));

AlterConfigsResult alterConfigsResult = client.incrementalAlterConfigs(configs);
        alterConfigsResult.all();

The configuration can be set up automatically using this @PostConstruct method that takes in NewTopic beans.


    @Autowired
    private Set<NewTopic> topics;

    @PostConstruct
    public void reconfigureTopics() throws ExecutionException, InterruptedException {

        try (final AdminClient adminClient = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers))) {
            adminClient.incrementalAlterConfigs(topics.stream()
                .filter(topic -> topic.configs() != null)
                .collect(Collectors.toMap(
                    topic -> new ConfigResource(ConfigResource.Type.TOPIC, topic.name()),
                    topic -> topic.configs().entrySet()
                        .stream()
                        .map(e -> new ConfigEntry(e.getKey(), e.getValue()))
                        .peek(ce -> log.debug("configuring {} {} = {}", topic.name(), ce.name(), ce.value()))
                        .map(ce -> new AlterConfigOp(ce, AlterConfigOp.OpType.SET))
                        .collect(Collectors.toList())
                )))
                .all()
                .get();
        }

    }
like image 146
Sergi Almar Avatar answered Sep 17 '22 15:09

Sergi Almar


I guess you could use admin client (https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html) for this. You can create Admin client instance in your application and use create or alter topic command for manipulating topic configurations, including retention.

like image 32
Levani Kokhreidze Avatar answered Sep 20 '22 15:09

Levani Kokhreidze