Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring KafkaListener: How to know when it's ready

I have a simple Spring Boot application which reads from Kafka and writes to Kafka. I wrote a SpringBootTest using an EmbeddedKafka to test all that.

The main problem is: Sometimes the test fails because the test sends the Kafka message too early. That way, the message is already written to Kafka before the Spring application (or its KafkaListener to be precise) is ready. Since the listener reads from the latest offset (I do not want to change any config for my test - except bootstrap.servers), it will not receive all messages in that test.

Does anyone have an idea how I could know inside the test, that the KafkaListener is ready to receive messages?

Only way I could think of is waiting until /health comes available but I have no idea whether I can be sure that this implies the KafkaListener to be ready at all.

Any help is greatly appreciated!

Best regards.

like image 367
konse Avatar asked Apr 10 '19 17:04

konse


People also ask

How do you know when a Kafka consumer is ready?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

What is Embeddedkafka?

A library that provides an in-memory Kafka instance to run your tests against. Inspired by kafka-unit.

What is ID in @KafkaListener?

id. java.lang.String id. The unique identifier of the container for this listener. If none is specified an auto-generated id is used. Note: When provided, this value will override the group id property in the consumer factory configuration, unless idIsGroup() is set to false or groupId() is provided.

What is the kafkalistener annotation in spring?

All of these things are managed by Spring so you can focus on your application code. The annotation KafkaListener instantiates a facility called MessageListenerContainer, which handles parallelization, configuration, retries, and other things that the Kafka application requires, such as offsets.

How does spring integrate with Kafka?

When you annotate the method, Spring takes care of instantiating the underlying containers that will run your Kafka consumers and read messages from your Kafka topics and handle serialization. All of these things are managed by Spring so you can focus on your application code.

What is the real goal of @kafkalistener?

Please, elaborate more what is your real goal. That @KafkaListener is really intended to be called by the Kafka listener container when records are pulled from the topic. What is the reason to call this method manually, not clear yet... @ArtemBilan - My goal is "I have a topic with 20 partitions and has employee data.

What is kafkalistenercontainerfactory?

The containerFactory () identifies the KafkaListenerContainerFactory to use to build the Kafka listener container. If not set, a default container factory is assumed to be available with a bean name of kafkaListenerContainerFactory unless an explicit default has been provided through configuration.


1 Answers

If you have a KafkaMessageListenerContainer instance, then it is very easy to use org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(Object container, int partitions).

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/test/utils/ContainerTestUtils.html

e.g. calling ContainerTestUtils.waitForAssignment(container, 1); in your Test setup will block until the container has gotten 1 partition assigned.

like image 100
Marcus Wallin Avatar answered Oct 20 '22 07:10

Marcus Wallin