Testing window aggregation with Kafka Streams

I'm playing with the TopologyTestDriver of Kafka Streams in order to get our data pipelines tested.

It has worked like a charm with all our simple topologies, including the stateful ones that use Stores. My problem is when I try to use this test driver in order to test topologies that use window aggregation.

I've copied a simple example that sums integers received with the same key within a 10 seconds window.

public class TopologyWindowTests {

TopologyTestDriver testDriver;

public void setup(){
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    // EventProcessor is a <String,String> processor
    // so we set those serders
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    testDriver = new TopologyTestDriver(defineTopology(),config,0L);

 * topology test
public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());

    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());


public void tearDown() {

 * Defines topology
 * @return
public Topology defineTopology(){
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);

    KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
            ()-> 0,
            (String aggKey, Integer newValue, Integer aggValue)->{
                Integer val = aggValue+newValue;
                return val;


    return builder.build();



I would expect that in this test case nothing is returned to the output topic unless I advance the wall clock time 10 seconds... But I'm getting the following output

java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>

Am I missing something here? I'm using kafka 2.0.0


Thanks in advance

According to Matthias response, I've prepared the following test:

public void testTopologyNoCorrelation() throws IOException {
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());

    // Testing 2+2=4
    ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());

    // Testing no more events in the window
    ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());

Both input messages has been sent with the same timestamp, so I'm expecting only one event in the output topic with the sum of my values. However, I'm receiving 2 events in the output (the first one with a value of 2, and the second one with a value of 4), which I think is not the desired behaviour of the topology.

By default, Kafka Streams operates on event-time for window operations, not wall-clock-time. This guarantees deterministic processing semantics (wall-clock time processing is inherently non-deterministic). Check out the docs for more more details: https://docs.confluent.io/current/streams/concepts.html#time

Thus, the timestamps of your input records determine in which window a record is put. Also, the timestamps of your input records advance the internally tracked "stream time" that is based on those event timestamps.

Also note, that Kafka Streams follow a continuous processing model and does emit updated instead of waiting for a window-end condition. This is important to handle late-arriving (aka out-of-order data). Compare How to send final kafka-streams aggregation result of a time windowed KTable? and https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/.


It's because of the "update" processing model. When you aggregate, each input record updates the "current" result, and a "current result output record" to produced. This happens for every record (not every timestamp).

