Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to write Unit test for @KafkaListener?

Trying to figure out if I can write unit test for @KafkaListener using spring-kafka and spring-kafka-test.

My Listener class.

    public class MyKafkaListener {
    private MyMessageProcessor myMessageProcessor;

    @KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
    public void myMessageListener(MyMessage message) {
        log.info("MyMessage processed");

My Test class :

    @EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
    @ContextConfiguration(classes = {TestKafkaConfig.class})
    public class MyMessageConsumersTest {

    private MyMessageProcessor myMessageProcessor;

    private String TOPIC_01;

    private KafkaTemplate<String, MyMessage> messageProducer;

    public void testSalesforceMessageListner() {
        MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
        messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
        verify(myMessageProcessor, times(1)).process(any(MyMessage.class));

My Test config class :

    public class TestKafkaConfig {
    public MyMessageProcessor myMessageProcessor() {
        return mock(MyMessageProcessor.class);
    public KafkaEmbedded kafkaEmbedded() {
        return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");

    public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

    public ProducerFactory<String, MyMessage> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    public KafkaTemplate<String, MyMessage> messageProducer() {
        return new KafkaTemplate<>(producerFactory());

Is there any simple way to make this work ?

Or should I do the testing of @KafkaListener in some other way ? In unit test, how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.

like image 560
Abbin Varghese Avatar asked Oct 12 '18 15:10

Abbin Varghese

People also ask

How do you write a unit test for Kafka consumer?

Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.

How do I test Kafka messages?

To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.

How do you test a Kafka Consumer spring boot?

First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.

How do I ensure @kafkalistener is invoked when a message arrives?

how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka. Well, this is essentially a Framework responsibility to test such a functionality. In your case you need just concentrate on the business logic and unit test exactly your custom code, but not that one compiled in the Framework.

How do I unit test a Kafka consumer?

Unit Testing Your Consumer Kafka unit tests of the Consumer code use MockConsumer object. The @Before will initialize the MockConsumer before each test. MockConsumer<String, String> consumer; @Before public void setUp () { consumer = new MockConsumer<String, String> (OffsetResetStrategy.EARLIEST); }

What config do I need to configure @kafkalistener and kafkatemplate?

You definitely have a minimal possible config for the @KafkaListener and KafkaTemplate. Only what you need is to remove a @EmbeddedKafka do not start the broker twice. You can wrap the listener in your test case.

What is spring Kafka test?

What is Spring Kafka Test? The Spring Kafka project comes with a spring-kafka-test JAR that contains a number of useful utilities to assist you with your application unit testing. These include an embedded Kafka broker, some static methods to setup consumers/producers and utility methods to fetch results.

1 Answers

how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.

Well, this is essentially a Framework responsibility to test such a functionality. In your case you need just concentrate on the business logic and unit test exactly your custom code, but not that one compiled in the Framework. In addition there is not goo point to test the @KafkaListener method which just logs incoming messages. It is definitely going to be too hard to find the hook for test-case verification.

On the other hand I really believe that business logic in your @KafkaListener method is much complicated than you show. So, it might be really better to verify your custom code (e.g. DB insert, some other service call etc.) called from that method rather than try to figure out the hook exactly for the myMessageListener().

What you do with the mock(MyMessageProcessor.class) is really a good way for business logic verification. Only what is wrong in your code is about that duplication for the EmbeddedKafka: you use an annotation and you also declare a @Bean in the config. You should think about removing one of them. Although it isn't clear where is your production code, which is really free from the embedded Kafka. Otherwise, if everything is in the test scope, I don't see any problems with your consumer and producer factories configuration. You definitely have a minimal possible config for the @KafkaListener and KafkaTemplate. Only what you need is to remove a @EmbeddedKafka do not start the broker twice.

like image 143
Artem Bilan Avatar answered Oct 08 '22 10:10

Artem Bilan