I am working with Kafka Stream 2.1
I am trying to write some test for a stream application that aggregates some events by their key (i.e by a correlation ID) using a session window with an inactivity gap of 300ms.
Here is the aggregation implementation represented by a method :
private static final int INACTIVITY_GAP = 300;
public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {
return source
// group by key (i.e by correlation ID)
.groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
// Define a session window with an inactivity gap of 300 ms
.windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
.aggregate(
// initializer
() -> new AggregatedCustomObject(),
// aggregates records in same session
(s, customObject, aggCustomObject) -> {
// ...
return aggCustomObject;
},
// merge sessions
(s, aggCustomObject1, aggCustomObject2) -> {
// ...
return aggCustomObject2;
},
Materialized.with(Serdes.String(), new AggCustomObjectSerde())
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.selectKey((stringWindowed, aggCustomObject) -> "someKey");
;
}
This stream process works as expected. But for unit tests, that's a different story.
My test stream configuration looks like this:
// ...
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
// disable cache
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// commit ASAP
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);
Topology topology = builder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)
// ...
And a test would look as follow:
List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
.map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
.collect(Collectors.toList());
testDriver.pipeInput(records);
ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);
The problem is, record
is always null.
I tried a lot of things :
advanceWallClockTime
method of the test driverWell, nothing helps. Could someone tell me what I am missing, and how should I test a session window based stream application ?
Thanks a lot
To test Kafka-based services in ReadyAPI, you use the API Connection test step. This test step is linked to either the Publish or Subscribe Kafka operation. Depending on a base operation, the test step works as a producer or as a consumer.
Define the input data (aka test fixture) Send it to the mocks of the input topics ( TestInputTopic ) Read what was sent by the topology from the mocks of the output topics ( TestOutputTopic ) Validate the result, comparing it to what was expected.
Windowing. Windowing allows you to bucket stateful operations by time, without which your aggregations would endlessly accumulate. A window gives you a snapshot of an aggregate within a given timeframe, and can be set as hopping, tumbling, session, or sliding.
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
SessionWindows
work with event-time and not wall-clock . Try to set the event-time of your record properly to simulate the inactivity gap. Something like:
testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));
But first, you need a custom TimestampExtractor
like:
public static class RecordTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
return record.timestamp();
}
}
which has to be registered like:
streamProperties.setProperty(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
RecordTimestampExtractor.class.getName()
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With