Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams - Processor API - Forward to different topics

Tags:

I have a Processor-API Processor, which internally forwards to several separate sinks (think of an event classifier, although it also has stateful logic between the events). I was thinking of having a join later between two of those topics. Once a join is made, I forward an updated (enriched) version of the elements to those topics I'm actually joining.

How would you mix DSL if in your Processor API code you forward to more than one sink(sink1, sink2) that in turn are sent to topics?

I guess you could you create separate streams, like

val stream1 = builder.stream(outputTopic) 
val stream2 = builder.stream(outputTopic2)

and build from there? However this creates more subtopologies - which are the implications here?

Another possibility is to have your own state store in the Processor API and manage it there, in the same Processor (I'm actually doing that). It adds complexity to the code, but wouldn't it be more efficient? For example, you can delete data you no longer use (once a join is made, you can forward new joined data to sinks and it is no longer eligible for a join). Any other efficiency gotcha?

like image 245
xmar Avatar asked May 14 '18 08:05

xmar


1 Answers

The simplest way might be to mix Processor API with the DSL by starting with a StreamsBuilder and use transform()

StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
                           .transform(/* put your processor API code here */)
                           .branch(...);

KStream joined = streams[0].join(streams[1], ...);

Writing the intermediate streams into topic first and read them back is also possible. The fact that you get more sub-topologies should be of no concern.

Doing the join manually via states is possible but hard to code correctly. If possible, I would recommend to use the provided join operator from the DSL.

like image 61
Matthias J. Sax Avatar answered Sep 28 '22 05:09

Matthias J. Sax