I have a simple Spring Boot application which reads from Kafka and writes to Kafka. I wrote a SpringBootTest
using an EmbeddedKafka
to test all that.
The main problem is: Sometimes the test fails because the test sends the Kafka message too early. That way, the message is already written to Kafka before the Spring application (or its KafkaListener
to be precise) is ready. Since the listener reads from the latest
offset (I do not want to change any config for my test - except bootstrap.servers), it will not receive all messages in that test.
Does anyone have an idea how I could know inside the test, that the KafkaListener
is ready to receive messages?
Only way I could think of is waiting until /health
comes available but I have no idea whether I can be sure that this implies the KafkaListener
to be ready at all.
Any help is greatly appreciated!
Best regards.
You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.
A library that provides an in-memory Kafka instance to run your tests against. Inspired by kafka-unit.
id. java.lang.String id. The unique identifier of the container for this listener. If none is specified an auto-generated id is used. Note: When provided, this value will override the group id property in the consumer factory configuration, unless idIsGroup() is set to false or groupId() is provided.
All of these things are managed by Spring so you can focus on your application code. The annotation KafkaListener instantiates a facility called MessageListenerContainer, which handles parallelization, configuration, retries, and other things that the Kafka application requires, such as offsets.
When you annotate the method, Spring takes care of instantiating the underlying containers that will run your Kafka consumers and read messages from your Kafka topics and handle serialization. All of these things are managed by Spring so you can focus on your application code.
Please, elaborate more what is your real goal. That @KafkaListener is really intended to be called by the Kafka listener container when records are pulled from the topic. What is the reason to call this method manually, not clear yet... @ArtemBilan - My goal is "I have a topic with 20 partitions and has employee data.
The containerFactory () identifies the KafkaListenerContainerFactory to use to build the Kafka listener container. If not set, a default container factory is assumed to be available with a bean name of kafkaListenerContainerFactory unless an explicit default has been provided through configuration.
If you have a KafkaMessageListenerContainer
instance, then it is very easy to use org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(Object container, int partitions)
.
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/test/utils/ContainerTestUtils.html
e.g. calling ContainerTestUtils.waitForAssignment(container, 1);
in your Test setup will block until the container has gotten 1 partition assigned.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With