I'm playing with the TopologyTestDriver of Kafka Streams in order to get our data pipelines tested.
It has worked like a charm with all our simple topologies, including the stateful ones that use Stores. My problem is when I try to use this test driver in order to test topologies that use window aggregation.
I've copied a simple example that sums integers received with the same key within a 10 seconds window.
public class TopologyWindowTests {
TopologyTestDriver testDriver;
String INPUT_TOPIC = "INPUT.TOPIC";
String OUTPUT_TOPIC = "OUTPUT.TOPIC";
@Before
public void setup(){
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// EventProcessor is a <String,String> processor
// so we set those serders
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
testDriver = new TopologyTestDriver(defineTopology(),config,0L);
}
/**
* topology test
*/
@Test
public void testTopologyNoCorrelation() throws IOException {
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertNull(outputRecord);
}
@After
public void tearDown() {
testDriver.close();
}
/**
* Defines topology
* @return
*/
public Topology defineTopology(){
StreamsBuilder builder = new StreamsBuilder();
KStream<String,Integer> inputStream = builder.stream(INPUT_TOPIC);
KTable<Windowed<String>, Integer> groupedMetrics = inputStream.groupBy((key,value)->key,
Serialized.with(Serdes.String(),Serdes.Integer())).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10))).aggregate(
()-> 0,
(String aggKey, Integer newValue, Integer aggValue)->{
Integer val = aggValue+newValue;
return val;
},
Materialized.<String,Integer,WindowStore<Bytes,byte[]>>as("GROUPING.WINDOW").withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
);
groupedMetrics.toStream().map((key,value)->KeyValue.pair(key.key(),value)).to(OUTPUT_TOPIC);
return builder.build();
}
}
I would expect that in this test case nothing is returned to the output topic unless I advance the wall clock time 10 seconds... But I'm getting the following output
java.lang.AssertionError: expected null, but was:<ProducerRecord(topic=OUTPUT.TOPIC, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=k, value=2, timestamp=0)>
Am I missing something here? I'm using kafka 2.0.0
UPDATE
Thanks in advance
According to Matthias response, I've prepared the following test:
@Test
public void testTopologyNoCorrelation() throws IOException {
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), new IntegerSerializer());
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
testDriver.pipeInput(factory.create(INPUT_TOPIC,"k",2,1L));
// Testing 2+2=4
ProducerRecord<String, Integer> outputRecord1 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertEquals(Integer.valueOf(4),outputRecord1.value());
// Testing no more events in the window
ProducerRecord<String, Integer> outputRecord2 = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), new IntegerDeserializer());
Assert.assertNull(outputRecord2);
}
Both input messages has been sent with the same timestamp, so I'm expecting only one event in the output topic with the sum of my values. However, I'm receiving 2 events in the output (the first one with a value of 2, and the second one with a value of 4), which I think is not the desired behaviour of the topology.
Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.
Define the input data (aka test fixture) Send it to the mocks of the input topics ( TestInputTopic ) Read what was sent by the topology from the mocks of the output topics ( TestOutputTopic ) Validate the result, comparing it to what was expected.
By default, Kafka Streams operates on event-time for window operations, not wall-clock-time. This guarantees deterministic processing semantics (wall-clock time processing is inherently non-deterministic). Check out the docs for more more details: https://docs.confluent.io/current/streams/concepts.html#time
Thus, the timestamps of your input records determine in which window a record is put. Also, the timestamps of your input records advance the internally tracked "stream time" that is based on those event timestamps.
Also note, that Kafka Streams follow a continuous processing model and does emit updated instead of waiting for a window-end condition. This is important to handle late-arriving (aka out-of-order data). Compare How to send final kafka-streams aggregation result of a time windowed KTable? and https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/.
Update
It's because of the "update" processing model. When you aggregate, each input record updates the "current" result, and a "current result output record" to produced. This happens for every record (not every timestamp).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With