Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging multiple identical Kafka Streams topics

I have 2 Kafka topics streaming the exact same content from different sources so I can have high availability in case one of the sources fails. I'm attempting to merge the 2 topics into 1 output topic using Kafka Streams 0.10.1.0 such that I don't miss any messages on failures and there are no duplicates when all sources are up.

When using the leftJoin method of KStream, one of the topics can go down with no problem (the secondary topic), but when the primary topic goes down, nothing is sent to the output topic. This seems to be because, according to the Kafka Streams developer guide,

KStream-KStream leftJoin is always driven by records arriving from the primary stream

so if there are no records coming from the primary stream, it will not use the records from the secondary stream even if they exist. Once the primary stream comes back online, output resumes normally.

I've also tried using outerJoin (which adds duplicate records) followed by a conversion to a KTable and groupByKey to get rid of duplicates,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

but I still get duplicates once in a while. I'm also using commit.interval.ms=200 to get the KTable to send to the output stream often enough.

What would be the best way to approach this merge to get exactly-once output from multiple identical input topics?

like image 202
Bogdan Avatar asked Nov 24 '16 23:11

Bogdan


People also ask

Is Kafka Streams multithreaded?

Here is the anatomy of an application that uses the Kafka Streams API. It provides a logical view of a Kafka Streams application that contains multiple stream threads, that each contain multiple stream tasks.

What is aggregate in Kafka Streams?

aggregate. Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.

What is stream merge?

Merging or concatenating streams is a frequent operation which consists in taking multiple streams and creating a new Multi out of them. Such an operation observes the items emitted by the different streams and produces a new Multi emitting the events.


1 Answers

Using any kind of join will not solve your problem, as you will always end up with either missing result (inner-join in case some streams stalls) or "duplicates" with null (left-join or outer-join in case both streams are online). See https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics for details on join semantics in Kafka Streams.

Thus, I would recommend to use Processor API that you can mix-and-match with DSL using KStream process(), transform(), or transformValues(). See How to filter keys and value with a Processor using Kafka Stream DSL for more details.

You can also add a custom store to your processor (How to add a custom StateStore to the Kafka Streams DSL processor?) to make duplicate-filtering fault-tolerant.

like image 153
Matthias J. Sax Avatar answered Sep 19 '22 16:09

Matthias J. Sax