Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send headers using KStream

I am working on a use case where I have created pipeline which sends data from mongo to elasticsearch.

Mongo -> Spring Boot -> Kafka -> Transformer(KStream) -> Kafka -> Consumer (Send to Elastic Search.)

I have to calculate time taken by record from Mongo to Elastic search. I thought of using Kafka Headers and keep forwarding their values in next Kafka Producer and at the end calculate same by subtracting from current timestamp.

I can send headers from producer but how to send same from Kafka Streams. In below code I want to send headers which I got while consuming inTopic and send them to outTopic.

private StreamsBuilder buildStream(final String bootstrapServers, final String inTopic, final String outTopic) {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream(inTopic);

    kStream.filter(new Predicate<String, String>() {
        public boolean test(String s, String s2) {
            return true;
        }
    })
    .to(outTopic);
    return streamsBuilder;
}
like image 366
cody123 Avatar asked May 28 '26 01:05

cody123


1 Answers

You can access headers in processor API by ProcessorContext and use DSL and processor API together.

What I would do is create my implementation of Transformer and use it in transform(). This question seems to be similar to yours and it has an answer with code snippet which I believe could be helpful: Set timestamp in output with Kafka Streams

like image 136
redfox Avatar answered May 31 '26 12:05

redfox