Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring kafka listener is looking ContainerProperties at wrong place

I am trying to use kafka listener in my spring boot application, but the server start is failing with the below error: The root cause is that it is looking for ContainerProperties in org.springframework.kafka.listener.config subpackage where it is available in org.springframework.kafka.listener

***************************
APPLICATION FAILED TO START
***************************

Description:

An attempt was made to call the method org.springframework.kafka.listener.AbstractMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties; but it does not exist. Its class, org.springframework.kafka.listener.AbstractMessageListenerContainer, is available from the following locations:

    jar:file:/Users/kumarman/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar!/org/springframework/kafka/listener/AbstractMessageListenerContainer.class

It was loaded from the following location:

    file:/Users/kumarman/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.kafka.listener.AbstractMessageListenerContainer

my dependency graph is:

lon2002619:wmt-service kumarman$ mvn dependency:tree | grep kafka
[INFO] +- org.springframework.kafka:spring-kafka:jar:2.2.0.RELEASE:compile
[INFO] |  \- org.apache.kafka:kafka-clients:jar:2.0.0:compile
[INFO] +- org.springframework.kafka:spring-kafka-test:jar:2.2.0.RELEASE:test
[INFO] |  +- org.apache.kafka:kafka-clients:jar:test:2.0.0:test
[INFO] |  +- org.apache.kafka:kafka_2.11:jar:2.0.0:test
[INFO] |  \- org.apache.kafka:kafka_2.11:jar:test:2.0.0:test
[INFO] |  |     +- io.zipkin.brave:brave-instrumentation-kafka-clients:jar:5.1.0:compile
[INFO] |     +- io.zipkin.reporter2:zipkin-sender-kafka11:jar:2.7.3:compile

And the kafka configuration code is:

@Configuration
public class KafkaConfiguration {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset());

        return props;
    }

    @Bean
    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Stacktrace after enabling trace log:

2018-11-29 10:51:05.292 TRACE [kumarman-wmt-service,,,] 46241 --- [           main] o.s.c.io.support.SpringFactoriesLoader   : Loaded [org.springframework.boot.diagnostics.FailureAnalysisReporter] names: [org.springframework.boot.diagnostics.LoggingFailureAnalysisReporter]
2018-11-29 10:51:05.301 DEBUG [kumarman-wmt-service,,,] 46241 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : Application failed to start due to an exception

java.lang.NoSuchMethodError: org.springframework.kafka.listener.AbstractMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
    at org.springframework.cloud.sleuth.instrument.messaging.SleuthKafkaAspect.wrapListenerContainerCreation(TraceMessagingAutoConfiguration.java:191) ~[spring-cloud-sleuth-core-2.0.0.RELEASE.jar!/:2.0.0.RELEASE]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688) ~[spring-aop-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory$$EnhancerBySpringCGLIB$$d4611e6.createListenerContainer(<generated>) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.createListenerContainer(KafkaListenerEndpointRegistry.java:183) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:155) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.registerListenerContainer(KafkaListenerEndpointRegistry.java:129) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.registerAllEndpoints(KafkaListenerEndpointRegistrar.java:164) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistrar.afterPropertiesSet(KafkaListenerEndpointRegistrar.java:158) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(KafkaListenerAnnotationBeanPostProcessor.java:259) ~[spring-kafka-2.2.0.RELEASE.jar!/:2.2.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.2.RELEASE.jar!/:5.1.2.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar!/:2.1.0.RELEASE]
    at com.betstars.wmt.app.WmtServiceApplication.main(WmtServiceApplication.java:14) [classes!/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
    at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [wmt-service-1.0.0-SNAPSHOT.jar:na]
    at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [wmt-service-1.0.0-SNAPSHOT.jar:na]

Pom Extract:

<spring.boot.version>2.1.0.RELEASE</spring.boot.version>
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
<!-- Zipkin - tracing -->
<dependency>
<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
like image 533
krmanish007 Avatar asked Oct 17 '22 10:10

krmanish007


1 Answers

Perhaps you are referencing it on your code?

It was moved from listener.config to listener in 2.2.

https://github.com/spring-projects/spring-kafka/commit/b048aaa8f00045e4cc65d5d137b1aa372beca3a2#diff-7121e19c8f33f6bddfd42b749f0bddb0

See the what's new.

EDIT

I was able to reproduce your issue with the following contrived example:

Compile this class against spring-kafka 2.0...

public class Foo {

    public void referenceOldClass(KafkaMessageListenerContainer<String, String> container) {
        container.getContainerProperties();
    }

}

Then reference that class from a Boot 2.1 application:

@SpringBootApplication
public class So53503830Application {

    public static void main(String[] args) {
        SpringApplication.run(So53503830Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    @Autowired
    private ConsumerFactory<String, String> cf;

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            ContainerProperties props = new ContainerProperties("topic");
            KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(this.cf,  props);
            foo.referenceOldClass(container);
        };
    }

}

With DEBUG logging, we get the stack trace...

java.lang.NoSuchMethodError: org.springframework.kafka.listener.KafkaMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties;
    at com.example.Foo.referenceOldClass(Foo.java:29) ~[classes/:na]
    at com.example.So53503830Application.lambda$0(So53503830Application.java:32) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:794) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:324) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.example.So53503830Application.main(So53503830Application.java:16) [classes/:na]

2018-11-28 15:02:56.962 ERROR 95564 --- [           main] o.s.b.d.LoggingFailureAnalysisReporter   : 

***************************
APPLICATION FAILED TO START
***************************

Description:

An attempt was made to call the method org.springframework.kafka.listener.KafkaMessageListenerContainer.getContainerProperties()Lorg/springframework/kafka/listener/config/ContainerProperties; but it does not exist. Its class, org.springframework.kafka.listener.KafkaMessageListenerContainer, is available from the following locations:

    jar:file:/Users/grussell/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.class

It was loaded from the following location:

    file:/Users/grussell/.m2/repository/org/springframework/kafka/spring-kafka/2.2.0.RELEASE/spring-kafka-2.2.0.RELEASE.jar


Action:

Correct the classpath of your application so that it contains a single, compatible version of org.springframework.kafka.listener.KafkaMessageListenerContainer
like image 60
Gary Russell Avatar answered Oct 21 '22 08:10

Gary Russell