I have a Processor that interact with a StateStore to filter and do complex logic on the messages. In the process(key,value)
method I use context.forward(key,value)
to send the keys and values that I need. For debugging purposes I also print those.
I have a KStream mergedStream
that results from a join of two other streams. I want to apply the processor to the records of that stream. I achieve this with : mergedStream.process(myprocessor,"stateStoreName")
When I start this program, I can see the proper values to be printed to my console. However if I send the mergedStream to a topic using mergedStream.to("topic")
the values on the topic are not the one I have forwarded in the processor, but the original ones.
I use kafka-streams 0.10.1.0.
What is the best way to get the values I have forwarded in the processor to another stream ?
Is it possible to mix the Processor API with the streams created by the KStream DSL?
Short:
To solve your problem you can use transform(...)
instead of process(...)
which gives you access to Processor API within DSL, too.
Long:
If you use process(...)
you apply a processor to a stream -- however, this is a "terminating" (or sink) operation (its return type is void
), i.e., it does not return any result (here "sink" does only mean that the operator has no successor -- it does not imply that any result is written somewhere!)
Furthermore, if you call mergedStream.process(...)
and mergedStream.to(...)
you basically branch-and-duplicate your stream and send one copy to each downstream operator (ie, one copy to process
and one copy to to
.
Mixing DSL and Processor API is absolutely possible (you did it already ;)). However, using process(...)
you cannot consumer data you forward(...)
within DSL -- if you want to consume Processor API result, you can use transform(...)
instead of process(...)
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With