Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to start Kafka listener manually?

I have some methods annotated with @KafkaListener but I want to start only some of them manually (depending on some conditions).

@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
    // consume
}
@PostConstruct
private void startConsumers() {
    if (true) {
        kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
    }
}

But at this moment kafkaListenerEndpointRegistry.getListenerContainers() is empty list and kafkaListenerEndpointRegistry.getListenerContainer("consumer1") returns null. So maybe the moment when @PostConstruct method is called is too early and listeners are still not registered. I tried to annotate startConsumers() method with @Scheduled(fixedDelay = 100) and listeners are already available. But using @Scheduled is not a good decision for something that I want to call once after starting the application.

like image 329
micobg Avatar asked Dec 22 '22 19:12

micobg


1 Answers

You can't do it in @PostConstruct - it's too early in the application context life cycle.

Implement SmartLifecyle set the phase to Integer.MAX_VALUE and start the container in the start() method.

Or use an @EventListener and listen for the ApplicationStartedEvent (if using Spring Boot) or ContextRefreshedEvent for a non-Boot Spring application.

like image 87
Gary Russell Avatar answered Jan 03 '23 14:01

Gary Russell