I'm trying to start my KafkaListener only when a flag is set to true.
@Component
public class KafkaTopicConsumer {
//Somehow wrap the listener to only start when a property value is set to true
@KafkaListener(topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
}
Is there a way to only make sure the listener is started when said a property such as start.consumer property is set to true? I don't want the listener starting every time the application is started only when I specify that I want it to be started. Is there a good way to approach this use case?
First, you need to set autoStartup to false and give your container a name. Then you need to start it manually based on a flag using @EventListener.
@Component
public class KafkaTopicConsumer {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Value("${start.consumer}")
private boolean shouldStart;
@KafkaListener(id = "myListener", autoStartup = "false", topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
}
@EventListener
public void onStarted(ApplicationStartedEvent event) {
if (shouldStart) {
MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
listenerContainer.start();
}
}
}
Note: @EventListener will make sure containers are properly loaded, if you use @PostConstruct it probably won't work.
EDIT:
Added the actual reading of the property using the @Value annotation.
Note: This approach has the added flexibility of allowing the start and stop methods to also be called dynamically (using JMX for example) with just a few changes. This facilitates the scenario where we want to disable a consumer and enable it later without restarting the application.
Another good approach, as correctly stated in @Makoton's answer, is to use @ConditionalOnProperty. Just to note that in your example, you can use it with @Component instead of defining the @Bean manually.
@Component
@ConditionalOnProperty(
value = "start.consumer",
havingValue = "true")
public class KafkaTopicConsumer { // ...
It all comes down to the level of flexibility you need.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With