Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Testing KafkaStreams applications

I've set up a simple aggregation averaging values from multiple streams together and I'm trying to test it. I've been burning a lot of time and I can't seem to get the concepts straight in my head. My stream is below:

// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream = 
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));

// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key, 
value) -> value.getK())
            .groupByKey(...)
            .aggregate(AvgTick::new,
                    (key, value, aggregate) -> {
                        aggregate.addTick(value);
                        return aggregate;
                    },
                    Materialized.with(...))
            .toStream();

indexTickerStream.to(sinkTopic, Produced.with(...));

My test uses EmbeddedKafka, posts a bunch of records to topics, and sits on a blocked queue waiting for the records to arrive at sinkTopic.

I’m interested in how this aggregation changes over time, so I’m looking to assert that average on each output ticker. I may add some level of windowing, but I’ve tried to keep it simple for now.

When I run my test I get varying results. Assuming I have 10 input records into my topology:

  • My aggregator gets called 10 times
  • A breakpoint I put inside of my AverageTick serialiser gets called a varying number of times.
  • I assert the values of the records in my tests.

I think this is because of the cache functionality defined in KIP-63 - The records are appearing very quickly to the processing node, and being coalesced/overwritten with the latest record. (I am not entirely sure though.)

I have unit tests passing with the ProcessorTopologyTestDriver, but I'm trying to write some acceptance tests for the service that holds this logic.

I've also tried playing with my commit.interval.ms configuration, as well as putting sleeps between publishes my input record, to varying degrees of (flaky) success.

  • Do these kinds of tests even make sense?
  • How can I assert correctness of this microservice against a real Kafka instance?

I feel I'm doing something conceptually wrong here - I just don't know what other approach to take.

like image 379
jaker Avatar asked Jan 01 '26 10:01

jaker


1 Answers

Your observation is correct. Caching makes testing hard because it introduces non-determinism.

To write a useful test you have two options:

  • disable caching by setting cache size to zero (this way, all output records, including all intermediate are deterministic)
  • only check the last result record per key (this last result must be always the same independent of caching for fixed input data)

Btw: in upcoming 1.1 Kafka adds a public test package and we plan to add more: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams

like image 133
Matthias J. Sax Avatar answered Jan 04 '26 14:01

Matthias J. Sax



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!