I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.
The definition of my listener is very basic.
@Component
public class Listener {
private final CountDownLatch latch;
@Autowired
public Listener(CountDownLatch latch) {
this.latch = latch;
}
@KafkaListener(topics = "sample-topic")
public void listen(String message) {
latch.countDown();
}
}
Also the test, that verifies the latch
counter to be equal to zero after receiving a message, is very easy.
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {
@Autowired
private KafkaEmbedded embeddedKafka;
@Autowired
private CountDownLatch latch;
private KafkaTemplate<Integer, String> producer;
@Before
public void setUp() {
this.producer = buildKafkaTemplate();
this.producer.setDefaultTopic("sample-topic");
}
private KafkaTemplate<Integer, String> buildKafkaTemplate() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
return new KafkaTemplate<>(pf);
}
@Test
public void listenerShouldConsumeMessages() throws InterruptedException {
// Given
producer.sendDefault(1, "Hello world");
// Then
assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
}
}
Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded
to test a method marked with the annotation @KafkaListener
?
All the code is shared in my GitHub repository kafka-listener.
Thanks to all.
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.
Spring Kafka Test is a Java Archive File that contains some helpful utilities to test your application. This jar has some useful methods for getting results and static methods for setting up the consumer/producer.
A library that provides an in-memory Kafka instance to run your tests against. Inspired by kafka-unit.
You are probably sending the message before the consumer has been assigned the topic/partition. Set property...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...it defaults to latest
.
This is like using --from-beginning
with the console consumer.
EDIT
Oh; you're not using boot's properties.
Add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
BTW, you should probably also do a get(10L, TimeUnit.SECONDS)
on the result of the template.send()
(a Future<>
) to assert that the send was successful.
EDIT3
To override the offset reset just for the test, you can do the same as what you did for the broker addresses:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
and
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.
Also, I would recommend setting enable.auto.commit
to false
so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.
Maybe someone will find this useful. I had a similar problem.
Locally tests were running (some checks were performed within Awaitility.waitAtMost
) but in the Jenkins pipeline, tests were failing.
The solution was, like already mentioned in the most voted answer, setting auto-offset-reset=earliest
.
When tests are running, you can check if you set the configuration properly by looking into test logs. Spring outputs configuration for both producer and consumer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With