Given: I have two topics in Kafka let's say topic A and topic B. The Kafka Stream reads a record from topic A, processes it and produces multiple records (let's say recordA and recordB) corresponding to the consumed record. Now, the question is how can I achieve this using Kafka Streams.
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()
Here, the record read is Message; After processing it returns a list of Message. How can I divide this list to two producer streams? Any help will be appreciated.
I am not sure if I understand the question correctly, and I also don't understand the answer from @Abhishek :(
If you have an input stream, and you want to get zero, one, or more output records per input records, you would apply a flatMap() or flatMapValues() (depending if you want to modify the key or not).
You are also asking about "How can I divide this list to two producer streams?" If you mean to split one stream into multiple, you can use branch().
For more details, I refer to the docs: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations
What's your key (type) ? I am guessing its not String. After executing the mapValues you'll have this - KStream<K,List<Message>>. If K is not String then someFunction() can be a map which will convert K into String (if its is, you already have the result) and leave the List<Message> (the value) untouched since that's your intended end result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With