Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Simple embedded Kafka test example with spring boot

Edit FYI: working gitHub example

I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.

My setup is:

  • Spring boot
  • Multiple @KafkaListener with different topics in one class
  • Embedded Kafka for test which is starting fine
  • Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
  • No warnings or errors are shown, only info spam from Kafka in logs

Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!

@Controller public class KafkaController {      private static final Logger LOG = getLogger(KafkaController.class);      @KafkaListener(topics = "test.kafka.topic")     public void receiveDunningHead(final String payload) {         LOG.debug("Receiving event with payload [{}]", payload);         //I will do database stuff here which i could check in db for testing     } } 

private static String SENDER_TOPIC = "test.kafka.topic";

@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);  @Test     public void testSend() throws InterruptedException, ExecutionException {          Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);          KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);         producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();         producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();         producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();         Thread.sleep(10000);     } 
like image 997
Yuna Braska Avatar asked Feb 12 '18 18:02

Yuna Braska

People also ask

How do you test a Kafka consumer Spring boot?

First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.

How do you write test cases for Kafka listener?

If you want to write integration tests using EmbeddedKafka , then you can do something like this. Assume we have some KafkaListener , which accepts a RequestDto as a Payload . In your test class you should create a TestConfiguration in order to create producer beans and to autowire KafkaTemplate into your test.

What is embedded Kafka?

A library that provides an in-memory Kafka instance to run your tests against.

How do I test a Kafka based application?

To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.

1 Answers

Embedded Kafka tests work for me with below configs,

Annotation on test class

@EnableKafka @SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config @EmbeddedKafka(     partitions = 1,      controlledShutdown = false,     brokerProperties = {         "listeners=PLAINTEXT://localhost:3333",          "port=3333" }) public class KafkaConsumerTest {     @Autowired     KafkaEmbedded kafkaEmbeded;      @Autowired     KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; 

Before annotation for setup method

@Before public void setUp() throws Exception {   for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {     ContainerTestUtils.waitForAssignment(messageListenerContainer,      kafkaEmbeded.getPartitionsPerTopic());   } } 

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka

@Test public void testReceive() throws Exception {      kafkaTemplate.send(topic, data); } 

Hope this helps!

Edit: Test configuration class marked with @TestConfiguration

@TestConfiguration public class TestConfig {  @Bean public ProducerFactory<String, String> producerFactory() {     return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded)); }  @Bean public KafkaTemplate<String, String> kafkaTemplate() {     KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());     kafkaTemplate.setDefaultTopic(topic);     return kafkaTemplate; } 

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data); 

Updated answer code block with above line

like image 150
donm Avatar answered Oct 12 '22 12:10
