I have a Kafka stream that takes data from a topic, and needs to filter that information to two different topics.
KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");
However, when I do it like this, it reads the data from the topic twice -- not sure if that has any impact on performance as the data gets larger. Is there a way to just filter it once and push it to two topics?
Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
Your approach is correct and data is not read twice from the topic and there is also no internal data-replication going on. The only disadvantage of your approach is, that both filter predicates are evaluated for each record -- however, this is quite cheap and should not be a performance issues.
However, you could still improve performance by using KStream#branch()
that does take multiple predicates and evaluates all predicates after each other and returns one input stream for each predicate. If a record matches a predicate, it is put into the corresponding output stream and the evaluation stops (i.e., not further predicate is evaluated for this single record -- this ensure that each record is added to max one output stream; or is dropped if no predicate matches).
Thus, you can just provide two predicate to branch()
: the first one is the same as your original filter()
predicate and the second predicate always returns true
.
KStream<String, Model> stream = builder.stream(
Serdes.String(),
specificAvroSerde,
"not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
(key, value) -> new Processor().test(key,value),
(key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");
Not sure if this code is better readable than your original version though. I guess it's a matter of taste and I personally like your original code better, because it does express semantics better.
The version I added, should be slightly more CPU efficient, as for all records that do satisfy the predicate it is only evaluated once. And for all records that do not satisfy the result, a simple true
will be return (i.e., no second predicate evaluation).
If you know that most records will end up in splitStream[1]
, you could also invert the predicate (and use splitStream[0]
as "bad-stream") to decrease the number of calls to the second true
-returning predicate. But those are only micro-optimizations and should not matter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With