Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams: one record to multiple records

Given: I have two topics in Kafka let's say topic A and topic B. The Kafka Stream reads a record from topic A, processes it and produces multiple records (let's say recordA and recordB) corresponding to the consumed record. Now, the question is how can I achieve this using Kafka Streams.

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()

Here, the record read is Message; After processing it returns a list of Message. How can I divide this list to two producer streams? Any help will be appreciated.

like image 644
user2538255 Avatar asked Dec 02 '22 11:12

user2538255


2 Answers

I am not sure if I understand the question correctly, and I also don't understand the answer from @Abhishek :(

If you have an input stream, and you want to get zero, one, or more output records per input records, you would apply a flatMap() or flatMapValues() (depending if you want to modify the key or not).

You are also asking about "How can I divide this list to two producer streams?" If you mean to split one stream into multiple, you can use branch().

For more details, I refer to the docs: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations

like image 98
Matthias J. Sax Avatar answered Dec 06 '22 01:12

Matthias J. Sax


What's your key (type) ? I am guessing its not String. After executing the mapValues you'll have this - KStream<K,List<Message>>. If K is not String then someFunction() can be a map which will convert K into String (if its is, you already have the result) and leave the List<Message> (the value) untouched since that's your intended end result

like image 20
Abhishek Avatar answered Dec 06 '22 01:12

Abhishek