Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

EmbeddedKafka not working cause an Scala error

I have a working spring-boot java-gradle-based service that can produce and consume Kafka messages. But I can't create an integration test using spring-kafka-test library with @EmbeddedKafka annotation or using @ClassRule way. In both ways, I end up with the same error (pointed below regarding Scala). If anybody has any clue about what could be happening behind the scenes it would be very helpful.

Spring boot version: 2.1.6.RELEASE Spring Kafka version: 2.2.7.RELEASE

Producer Config:

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    private String bootstrapAddress;

    public KafkaProducerConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapAddress) {
        this.bootstrapAddress = bootstrapAddress;
    }

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

Producer Code:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Greeting> kafkaTemplate;

    @Value(value = "${kiosk.kafka.topic.greeting}")
    private String greetingTopicName;

    public void sendMessage(Greeting greeting) {

        ListenableFuture<SendResult<String, Greeting>> future = kafkaTemplate.send(greetingTopicName, greeting);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Greeting>>() {

            @Override
            public void onSuccess(SendResult<String, Greeting> result) {
                System.out.println("Sent greeting=[" + greeting + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send greeting=[" + greeting + "] due to : " + ex.getMessage());
            }
        });
    }
}

properties file:

spring.kafka.consumer.auto-offset-reset=earliest
spring.embedded.kafka.brokers=localhost:9092
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=1

And I tried the test with both ways and the same error while starting the test:

Test class rule:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class PKafkaProducerClassRuleTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerClassRuleTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private PinPadKafkaProducer sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka =
        new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

Test with EmbeddedKafka:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1,
    topics = {
        "sender.t" })
public class PKafkaProducerEmbeddedKafkaTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerEmbeddedKafkaTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
            "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private PinPadKafkaProducer sender;

    @Test
    public void someTest() {
        sender.sendMessage(new Greeting("msgtest", "nametest"));
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, SENDER_TOPIC);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

}

Error in both scenarios:

19/11/2019 13:51:56.774+0100 ERROR o.s.t.c.TestContextManager [] - Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@e9cee0d] to prepare test instance [com.goldcar.kiosk.PKafkaProducerEmbeddedKafkaTest@39271181]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:109)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:300)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:621)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:365)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:119)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 49 common frames omitted
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at kafka.cluster.EndPoint$.<init>(EndPoint.scala:32)
    at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:68)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:781)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:234)
    at kafka.utils.TestUtils.createBrokerConfig(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createBrokerProperties(EmbeddedKafkaBroker.java:239)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    ... 58 common frames omitted
like image 592
agcr Avatar asked Dec 10 '25 15:12

agcr


2 Answers

It appears you have mis-matched versions of kafka jars on the classpath.

This will happen if you use a different kafka-clients version than the default set by Boot.

See this appendix for an example for how to override all kafka jar versions.

When you use spring-kafka-test (version 2.2.x) with the 2.1.x kafka-clients jar, you need to override certain transitive dependencies, as follows:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
        </exclusion>
    </exclusions>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

Note that when switching to scala 2.12 (recommended for 2.1.x and higher), the 2.11 version must be excluded from spring-kafka-test.

like image 104
Gary Russell Avatar answered Dec 13 '25 07:12

Gary Russell


Solution applying using @Gary Russell answer in gradle:

gradle.properties:

spring_kafka_test_version=2.2.7.RELEASE
kafka_test_version=2.1.1
kafka_clients_test_version=2.1.1

build.gradle:

implementation group: 'org.springframework.kafka', name: 'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version
implementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version

testImplementation (group: 'org.springframework.kafka', name: 'spring-kafka-test', version: spring_kafka_test_version) {
    exclude module: 'kafka_2.11'
}
testImplementation group: 'org.apache.kafka', name: 'kafka_2.12', version: kafka_test_version, classifier: 'test'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafka_clients_test_version
like image 22
agcr Avatar answered Dec 13 '25 07:12

agcr



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!