Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - How to use filter and filternot at the same time?

I have a Kafka stream that takes data from a topic, and needs to filter that information to two different topics.

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

However, when I do it like this, it reads the data from the topic twice -- not sure if that has any impact on performance as the data gets larger. Is there a way to just filter it once and push it to two topics?

like image 709
m1771vw Avatar asked Dec 01 '16 18:12

m1771vw


People also ask

How do I combine two topics in Kafka?

Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.

How do you convert KStream to KTable?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.


1 Answers

Your approach is correct and data is not read twice from the topic and there is also no internal data-replication going on. The only disadvantage of your approach is, that both filter predicates are evaluated for each record -- however, this is quite cheap and should not be a performance issues.

However, you could still improve performance by using KStream#branch() that does take multiple predicates and evaluates all predicates after each other and returns one input stream for each predicate. If a record matches a predicate, it is put into the corresponding output stream and the evaluation stops (i.e., not further predicate is evaluated for this single record -- this ensure that each record is added to max one output stream; or is dropped if no predicate matches).

Thus, you can just provide two predicate to branch(): the first one is the same as your original filter() predicate and the second predicate always returns true.

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

Not sure if this code is better readable than your original version though. I guess it's a matter of taste and I personally like your original code better, because it does express semantics better.

The version I added, should be slightly more CPU efficient, as for all records that do satisfy the predicate it is only evaluated once. And for all records that do not satisfy the result, a simple true will be return (i.e., no second predicate evaluation).

If you know that most records will end up in splitStream[1], you could also invert the predicate (and use splitStream[0] as "bad-stream") to decrease the number of calls to the second true-returning predicate. But those are only micro-optimizations and should not matter.

like image 167
Matthias J. Sax Avatar answered Oct 05 '22 23:10

Matthias J. Sax