I created a spring boot application that sends messages to a Kafka topic. I am using spring spring-integration-kafka
:
A KafkaProducerMessageHandler<String,String>
is subscribed to a channel (SubscribableChannel
) and pushes all messages received to one topic.
The application works fine. I see messages arriving in Kafka via console consumer (local kafka).
I also create an Integrationtest that uses KafkaEmbedded
. I am checking the expected messages by subscribing to the channel within the test - all is fine.
But i want the test to check also the messages put into kafka. Sadly Kafka's JavaDoc is not the best. What i tried so far is:
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {
mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );
}
//...
@Test
public void endToEnd() throws Exception {
// ...
ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );
StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );
}
The problem is that i don't see any records. I am not sure if my KafkaEmbedded setup is correct. But messages are receive by the channel.
You can use the Kafka-console-consumer to view your messages. It provides a command line utility, bin/kafka-console-consumer.sh, that sends messages from a topic to an output file. To display a maximum number of messages by using: --from-beginning and --max-messages ${NUM_MESSAGES}.
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.
GitHub - embeddedkafka/embedded-kafka: A library that provides an in-memory Kafka instance to run your tests against.
This works for me. Give it a try
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {
private static String SENDER_TOPIC = "testTopic";
@ClassRule
// By default it creates two partitions.
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Test
public void testSend() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
//If you wish to send it to partitions other than 0 and 1,
//then you need to specify number of paritions in the declaration
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
// Make sure you set the offset as earliest, because by the
// time consumer starts, producer might have sent all messages
consumerProps.put("auto.offset.reset", "earliest");
final List<String> receivedMessages = Lists.newArrayList();
final CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
try {
while (true) {
ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
records.iterator().forEachRemaining(record -> {
receivedMessages.add(record.value());
latch.countDown();
});
}
} finally {
kafkaConsumer.close();
}
});
latch.await(10, TimeUnit.SECONDS);
assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
}
}
I am using countdown latch because Producer.Send(..)
is an async operation. So what i am doing here is waiting in an infinite loop polling kafka every 100 milliseconds, if there is new record and if so adding it to a List for future assertions and then reducing the countdown. And I will wait for 10 seconds in total just to be sure.
You can as well use a simple loop and then exit after a few minutes.(If you don't wish to use CountdownLatch and ExecutorService stuff)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With