I have a spring boot kafka application. My brokers are recycled every few days. The old brokers are deprovisioned and new brokers are provisioned.
I have a scheduler which is checking for brokers every few hours. I would like to make sure as soon as the we have new brokers, we should reload all the Spring Kafka related beans. Very similar to KafkaAutoConfiguration except I want a trigger on broker value change and load the auto configuration programmatically.
How do I call the auto configure programmatically whenever the old brokers are replaced with new one ?
Auto-Configuration in Spring Boot The annotation @EnableAutoConfiguration is used to enable the auto-configuration feature. The @EnableAutoConfiguration annotation enables the auto-configuration of Spring ApplicationContext by scanning the classpath components and registering the beans.
In order to create a custom auto-configuration, we need to create a class annotated as @Configuration and register it. Let's create a custom configuration for a MySQL data source: @Configuration public class MySQLAutoconfiguration { //... } Next, we need to register the class as an auto-configuration candidate.
Spring @Configuration annotation is part of the spring core framework. Spring Configuration annotation indicates that the class has @Bean definition methods. So Spring container can process the class and generate Spring Beans to be used in the application.
You need to opt-in to auto-configuration by adding the @EnableAutoConfiguration or @SpringBootApplication annotations to one of your @Configuration classes. You should only ever add one @EnableAutoConfiguration annotation. We generally recommend that you add it to your primary @Configuration class.
Your requirements sounds like Config Server in Spring Cloud:https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_config_2.html#_spring_cloud_config_2 with its @RefreshScope
feature: https://cloud.spring.io/spring-cloud-static/Greenwich.SR2/multi/multi__spring_cloud_context_application_context_services.html#refresh-scope.
So, you need to specify your own beans and mark them with that annotation:
@Bean
@RefreshScope
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@RefreshScope
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
These two beans rely on the configuration properties for connection to Apache Kafka broker and that is really fully enough to have them refreshable. Whenever a ContextRefreshedEvent
happens these beans are going to be re-initialized with a fresh configuration properties.
I think the ConsumerFactory
consumers (MessageListenerContainer
and KafkaListenerEndpointRegistry
) have to be restarted on that event as well. The point is that MessageListenerContainer
starts a long-living process and therefore caches a KafkaConsumer
instance for the poll
purposes.
All the ProducerFactory
consumers don't need to be restarted. Even if KafkaProducer
is cached in the DefaultKafkaProducerFactory
it is going to be reinitialized during @RefreshScope
phase.
UPDATE
I don’t use config server. I get the new hosts from consul catalog service.
Right, I didn't say that you use a Config Server. That just looks for me similar way. So, from big height I would really take a look into a Config Client implementation for your Consul catalog solution.
Nevertheless you still can emit a RefreshEvent
which will trigger all your @RefreshScope
'd beans to be reloaded. For that purpose you need to implement an ApplicationEventPublisherAware
and emit that event whenever you have update from Consul. Remember: Kafka listener containers must be restarted. For that purpose you can listen for the RefreshScopeRefreshedEvent
since you really are interested in the restart only when all the @RefreshScope
have been refreshed.
More about refresh scope: https://gist.github.com/dsyer/a43fe5f74427b371519af68c5c4904c7
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With