I am working on a use case where I have created pipeline which sends data from mongo to elasticsearch.
Mongo -> Spring Boot -> Kafka -> Transformer(KStream) -> Kafka -> Consumer (Send to Elastic Search.)
I have to calculate time taken by record from Mongo to Elastic search. I thought of using Kafka Headers and keep forwarding their values in next Kafka Producer and at the end calculate same by subtracting from current timestamp.
I can send headers from producer but how to send same from Kafka Streams. In below code I want to send headers which I got while consuming inTopic and send them to outTopic.
private StreamsBuilder buildStream(final String bootstrapServers, final String inTopic, final String outTopic) {
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> kStream = streamsBuilder.stream(inTopic);
kStream.filter(new Predicate<String, String>() {
public boolean test(String s, String s2) {
return true;
}
})
.to(outTopic);
return streamsBuilder;
}
You can access headers in processor API by ProcessorContext and use DSL and processor API together.
What I would do is create my implementation of Transformer and use it in transform(). This question seems to be similar to yours and it has an answer with code snippet which I believe could be helpful: Set timestamp in output with Kafka Streams
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With