Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

EmbeddedKafka how to check received messages in unit test

I created a spring boot application that sends messages to a Kafka topic. I am using spring spring-integration-kafka: A KafkaProducerMessageHandler<String,String> is subscribed to a channel (SubscribableChannel) and pushes all messages received to one topic. The application works fine. I see messages arriving in Kafka via console consumer (local kafka).

I also create an Integrationtest that uses KafkaEmbedded. I am checking the expected messages by subscribing to the channel within the test - all is fine.

But i want the test to check also the messages put into kafka. Sadly Kafka's JavaDoc is not the best. What i tried so far is:

@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {

    mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
    kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );

}
//...

@Test
public void endToEnd() throws Exception {
//  ...

    ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );

    StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );


}

The problem is that i don't see any records. I am not sure if my KafkaEmbedded setup is correct. But messages are receive by the channel.

like image 532
dermoritz Avatar asked Feb 08 '18 10:02

dermoritz


People also ask

How do I check the list of messages in a Kafka topic?

You can use the Kafka-console-consumer to view your messages. It provides a command line utility, bin/kafka-console-consumer.sh, that sends messages from a topic to an output file. To display a maximum number of messages by using: --from-beginning and --max-messages ${NUM_MESSAGES}.

How do you unit test a Kafka consumer?

Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.

How do I test a Kafka spring starter template?

First, we start by decorating our test class with two pretty standard Spring annotations: The @SpringBootTest annotation will ensure that our test bootstraps the Spring application context. We also use the @DirtiesContext annotation, which will make sure this context is cleaned and reset between different tests.

What is Embeddedkafka?

GitHub - embeddedkafka/embedded-kafka: A library that provides an in-memory Kafka instance to run your tests against.


1 Answers

This works for me. Give it a try

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {

    private static String SENDER_TOPIC = "testTopic";

    @ClassRule
    // By default it creates two partitions.
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); 

    @Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        //If you wish to send it to partitions other than 0 and 1, 
        //then you need to specify number of paritions in the declaration

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();


        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
        // Make sure you set the offset as earliest, because by the 
        // time consumer starts, producer might have sent all messages
        consumerProps.put("auto.offset.reset", "earliest");

        final List<String> receivedMessages = Lists.newArrayList();
        final CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
            kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
            try {
                while (true) {
                    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                    records.iterator().forEachRemaining(record -> {
                        receivedMessages.add(record.value());
                        latch.countDown();
                    });
                }
            } finally {
                kafkaConsumer.close();
            }
        });

    latch.await(10, TimeUnit.SECONDS);
    assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
    }
}

I am using countdown latch because Producer.Send(..) is an async operation. So what i am doing here is waiting in an infinite loop polling kafka every 100 milliseconds, if there is new record and if so adding it to a List for future assertions and then reducing the countdown. And I will wait for 10 seconds in total just to be sure.
You can as well use a simple loop and then exit after a few minutes.(If you don't wish to use CountdownLatch and ExecutorService stuff)

like image 127
pvpkiran Avatar answered Oct 05 '22 06:10

pvpkiran