Given: I have two topics in Kafka let's say topic A and topic B. The Kafka Stream reads a record from topic A, processes it and produces multiple records (let's say recordA and recordB) corresponding to the consumed record. Now, the question is how can I achieve this using Kafka Streams.
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
Here, the record read is Message; After processing it returns a list of Message. How can I divide this list to two producer streams? Any help will be appreciated.
I am not sure if I understand the question correctly, and I also don't understand the answer from @Abhishek :(
If you have an input stream, and you want to get zero, one, or more output records per input records, you would apply a flatMap()
or flatMapValues()
(depending if you want to modify the key or not).
You are also asking about "How can I divide this list to two producer streams?" If you mean to split one stream into multiple, you can use branch()
.
For more details, I refer to the docs: https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations
What's your key (type) ? I am guessing its not String
. After executing the mapValues
you'll have this - KStream<K,List<Message>>
. If K
is not String
then someFunction()
can be a map
which will convert K
into String
(if its is, you already have the result) and leave the List<Message>
(the value) untouched since that's your intended end result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With