Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Testing a @KafkaListener using Spring Embedded Kafka

Tags:

I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

The definition of my listener is very basic.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?

All the code is shared in my GitHub repository kafka-listener.

Thanks to all.

like image 973
riccardo.cardin Avatar asked May 01 '18 20:05

riccardo.cardin


People also ask

How do you test a producer consumer Kafka?

Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.

How do I test a Kafka based application?

To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.

What is spring Kafka test?

Spring Kafka Test is a Java Archive File that contains some helpful utilities to test your application. This jar has some useful methods for getting results and static methods for setting up the consumer/producer.

What is embedded Kafka?

A library that provides an in-memory Kafka instance to run your tests against. Inspired by kafka-unit.


2 Answers

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it defaults to latest.

This is like using --from-beginning with the console consumer.

EDIT

Oh; you're not using boot's properties.

Add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

BTW, you should probably also do a get(10L, TimeUnit.SECONDS) on the result of the template.send() (a Future<>) to assert that the send was successful.

EDIT3

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

and

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

Also, I would recommend setting enable.auto.commit to false so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

like image 84
Gary Russell Avatar answered Sep 17 '22 14:09

Gary Russell


Maybe someone will find this useful. I had a similar problem. Locally tests were running (some checks were performed within Awaitility.waitAtMost) but in the Jenkins pipeline, tests were failing.

The solution was, like already mentioned in the most voted answer, setting auto-offset-reset=earliest. When tests are running, you can check if you set the configuration properly by looking into test logs. Spring outputs configuration for both producer and consumer

like image 27
Pawel Ryznar Avatar answered Sep 19 '22 14:09

Pawel Ryznar