In a Spring Boot application I'm using a class annotated with @KafkaListener as a message listener. I want to add a ConsumerRebalanceLister to my application to manage cached data on a rebalance.
How do I add a ConsumerRebalanceListener to a ConcurrentKafkaListenerContainerFactory. The documentation says that it should be set on a ContainerProperties object. It's not clear how to access that object in order to set it. Additionally, it looks like the ConcurrentKafkaListenerContainerFactory throws away the rebalance listener since it creates a new ContainerProperties object when creating a listener container instance.
I feel like I'm missing something really obvious here, before this commit there was a method to simply set the rebalance listener directly on the ConcurrentKafkaListenerContainerFactory.
Consider to use this method on the ConcurrentKafkaListenerContainerFactory:
/**
* Obtain the properties template for this factory - set properties as needed
* and they will be copied to a final properties instance for the endpoint.
* @return the properties.
*/
public ContainerProperties getContainerProperties() {
This is where you can add your ConsumerRebalanceListener. You @Autowired an auto-configured ConcurrentKafkaListenerContainerFactory and perform the mentioned injection:
@Autowired
private ConcurrentKafkaListenerContainerFactory containerFactory;
@PostConstruct
public void init() {
this.containerFactory.getContainerProperties()
.setConsumerRebalanceListener(myConsumerRebalanceListener());
}
@Bean
public ConsumerRebalanceListener myConsumerRebalanceListener() {
return new ConsumerRebalanceListener() {
...
};
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With