I have a Processor-API Processor, which internally forwards to several separate sinks (think of an event classifier, although it also has stateful logic between the events). I was thinking of having a join later between two of those topics. Once a join is made, I forward an updated (enriched) version of the elements to those topics I'm actually joining.
How would you mix DSL if in your Processor API code you forward to more than one sink(sink1, sink2) that in turn are sent to topics?
I guess you could you create separate streams, like
val stream1 = builder.stream(outputTopic)
val stream2 = builder.stream(outputTopic2)
and build from there? However this creates more subtopologies - which are the implications here?
Another possibility is to have your own state store in the Processor API and manage it there, in the same Processor (I'm actually doing that). It adds complexity to the code, but wouldn't it be more efficient? For example, you can delete data you no longer use (once a join is made, you can forward new joined data to sinks and it is no longer eligible for a join). Any other efficiency gotcha?
The simplest way might be to mix Processor API with the DSL by starting with a StreamsBuilder
and use transform()
StreamsBuilder builder = new StreamsBuilder()
KStream[] streams = builder.stream("input-topic")
.transform(/* put your processor API code here */)
.branch(...);
KStream joined = streams[0].join(streams[1], ...);
Writing the intermediate streams into topic first and read them back is also possible. The fact that you get more sub-topologies should be of no concern.
Doing the join manually via states is possible but hard to code correctly. If possible, I would recommend to use the provided join operator from the DSL.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With