I have a working prototype Spring Boot application which listens on a Kafka queue. Apart from the configuration in application.yml
, all that is required is a MessageListener
implementation annotated with @KafkaListener
.
Am now introducing Spring Integration, and to do so have configured these beans:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = ...; // set proerties
return new DefaultKafkaConsumerFactory<>(props);
}
The application is not starting, and is throwing this error:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
This is even though I have defined a ConsumerFactory
bean.
Running in debug mode, it is apparent that Boot is loading a KafkaListenerEndpointContainer
bean to listen on the broker:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator localhost:9092 (id: 2147483999 rack: null) for group my_group.
Then:
KafkaAnnotationDrivenConfiguration matched:
- @ConditionalOnClass found required class 'org.springframework.kafka.annotation.EnableKafka'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)
KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactory matched:
- @ConditionalOnMissingBean (names: kafkaListenerContainerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAnnotationDrivenConfiguration#kafkaListenerContainerFactoryConfigurer matched:
- @ConditionalOnMissingBean (types: org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration matched:
- @ConditionalOnMissingBean (names: org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration matched:
- @ConditionalOnClass found required class 'org.springframework.kafka.core.KafkaTemplate'; @ConditionalOnMissingClass did not find unwanted class (OnClassCondition)
KafkaAutoConfiguration#kafkaProducerFactory matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.core.ProducerFactory; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration#kafkaProducerListener matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.support.ProducerListener; SearchStrategy: all) did not find any beans (OnBeanCondition)
KafkaAutoConfiguration#kafkaTemplate matched:
- @ConditionalOnMissingBean (types: org.springframework.kafka.core.KafkaTemplate; SearchStrategy: all) did not find any beans (OnBeanCondition)
I think what is happening is that the Spring Boot\Kafa auto configuration is clashing with the Spring Integration\Kafka setup. What is the correct way to resolve this?
Thanks
You can either use Boot's Consumer factory...
@Bean
public KafkaMessageListenerContainer<String, String> container(ConsumerFactory<String, String> cf) {
...
}
Or disable kafka auto configuration
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With