I've set up a simple aggregation averaging values from multiple streams together and I'm trying to test it. I've been burning a lot of time and I can't seem to get the concepts straight in my head. My stream is below:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
My test uses EmbeddedKafka, posts a bunch of records to topics, and sits on a blocked queue waiting for the records to arrive at sinkTopic.
I’m interested in how this aggregation changes over time, so I’m looking to assert that average on each output ticker. I may add some level of windowing, but I’ve tried to keep it simple for now.
When I run my test I get varying results. Assuming I have 10 input records into my topology:
AverageTick serialiser gets called a varying number of times.I think this is because of the cache functionality defined in KIP-63 - The records are appearing very quickly to the processing node, and being coalesced/overwritten with the latest record. (I am not entirely sure though.)
I have unit tests passing with the ProcessorTopologyTestDriver, but I'm trying to write some acceptance tests for the service that holds this logic.
I've also tried playing with my commit.interval.ms configuration, as well as putting sleeps between publishes my input record, to varying degrees of (flaky) success.
I feel I'm doing something conceptually wrong here - I just don't know what other approach to take.
Your observation is correct. Caching makes testing hard because it introduces non-determinism.
To write a useful test you have two options:
Btw: in upcoming 1.1 Kafka adds a public test package and we plan to add more: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With