Edit FYI: working gitHub example
I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.
My setup is:
Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!
@Controller public class KafkaController { private static final Logger LOG = getLogger(KafkaController.class); @KafkaListener(topics = "test.kafka.topic") public void receiveDunningHead(final String payload) { LOG.debug("Receiving event with payload [{}]", payload); //I will do database stuff here which i could check in db for testing } }
private static String SENDER_TOPIC = "test.kafka.topic";
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); @Test public void testSend() throws InterruptedException, ExecutionException { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get(); Thread.sleep(10000); }
First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.
If you want to write integration tests using EmbeddedKafka , then you can do something like this. Assume we have some KafkaListener , which accepts a RequestDto as a Payload . In your test class you should create a TestConfiguration in order to create producer beans and to autowire KafkaTemplate into your test.
A library that provides an in-memory Kafka instance to run your tests against.
To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.
Embedded Kafka tests work for me with below configs,
Annotation on test class
@EnableKafka @SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config @EmbeddedKafka( partitions = 1, controlledShutdown = false, brokerProperties = { "listeners=PLAINTEXT://localhost:3333", "port=3333" }) public class KafkaConsumerTest { @Autowired KafkaEmbedded kafkaEmbeded; @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Before annotation for setup method
@Before public void setUp() throws Exception { for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { ContainerTestUtils.waitForAssignment(messageListenerContainer, kafkaEmbeded.getPartitionsPerTopic()); } }
Note: I am not using @ClassRule
for creating embedded Kafka rather auto-wiring @Autowired embeddedKafka
@Test public void testReceive() throws Exception { kafkaTemplate.send(topic, data); }
Hope this helps!
Edit: Test configuration class marked with @TestConfiguration
@TestConfiguration public class TestConfig { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded)); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic(topic); return kafkaTemplate; }
Now @Test
method will autowire KafkaTemplate and use is to send message
kafkaTemplate.send(topic, data);
Updated answer code block with above line
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With