Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to unit test a kafka stream application that uses session window

I am working with Kafka Stream 2.1

I am trying to write some test for a stream application that aggregates some events by their key (i.e by a correlation ID) using a session window with an inactivity gap of 300ms.

Here is the aggregation implementation represented by a method :

    private static final int INACTIVITY_GAP = 300;

    public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {

        return source
                // group by key (i.e by correlation ID)
                .groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
                // Define a session window with an inactivity gap of 300 ms
                .windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
                .aggregate(
                        // initializer
                        () -> new AggregatedCustomObject(),
                        // aggregates records in same session
                        (s, customObject, aggCustomObject) -> {
                            // ...
                            return aggCustomObject;
                        },
                        // merge sessions
                        (s, aggCustomObject1, aggCustomObject2) -> {
                            // ...
                            return aggCustomObject2;
                        },
                        Materialized.with(Serdes.String(), new AggCustomObjectSerde())
                )
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .selectKey((stringWindowed, aggCustomObject) -> "someKey");
    ;
    }

This stream process works as expected. But for unit tests, that's a different story.

My test stream configuration looks like this:

        // ...

        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
        // disable cache
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // commit ASAP
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);


        StreamsBuilder builder = new StreamsBuilder();
        aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);

        Topology topology = builder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
        ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)

        // ...

And a test would look as follow:

List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
                .map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
                .collect(Collectors.toList());
testDriver.pipeInput(records);

ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);

The problem is, record is always null. I tried a lot of things :

  • read in a loop with a timeout
  • change commit interval in config so result would be committed ASAP
  • Send an additional record with a different key just after (to trigger the window closing, as in KafkaStream event-time is based on record timestamps)
  • call the advanceWallClockTime method of the test driver

Well, nothing helps. Could someone tell me what I am missing, and how should I test a session window based stream application ?

Thanks a lot

like image 699
gnos Avatar asked Aug 13 '19 15:08

gnos


People also ask

How do I test a Kafka based application?

To test Kafka-based services in ReadyAPI, you use the API Connection test step. This test step is linked to either the Publish or Subscribe Kafka operation. Depending on a base operation, the test step works as a producer or as a consumer.

How do I test Kafka streaming?

Define the input data (aka test fixture) Send it to the mocks of the input topics ( TestInputTopic ) Read what was sent by the topology from the mocks of the output topics ( TestOutputTopic ) Validate the result, comparing it to what was expected.

What is Kafka streaming windowing?

Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.

How do you write a unit test for Kafka consumer?

Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.


1 Answers

SessionWindows work with event-time and not wall-clock . Try to set the event-time of your record properly to simulate the inactivity gap. Something like:

testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));

But first, you need a custom TimestampExtractor like:

 public static class RecordTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
      return record.timestamp();
    }
  }

which has to be registered like:

 streamProperties.setProperty(
        StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
        RecordTimestampExtractor.class.getName()
    );
like image 193
Peyman Avatar answered Sep 24 '22 14:09

Peyman