Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Multiple streams from a single master topic

How can I make multiple streams from a single master topic? When I do something like this:

KStreamBuilder builder = new KStreamBuilder();

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output1");

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);

I get the following error:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)

Do I need to make another instance of KafkaStreams for each stream from "master"?

like image 734
bm1729 Avatar asked Feb 23 '17 08:02


People also ask

What is KStream in Kafka?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.

What is KTable?

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).

2 Answers

You can create a KStream that you can reuse:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

then you can reuse it:

        .to(Serdes.String(), Serdes.String(), "output1");
        .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
like image 63
Clemens Valiente Avatar answered Oct 12 '22 01:10

Clemens Valiente

you can also use branch feature to achieve this

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

and then using branch will create array of results set

final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                   new Predicate<String, String>() {
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                    //you can write multiple predicate to filter inputStream 

finally after branching is done splitStream[0] will contain of first filter output and splitStream[1] will contain 2nd filter output and so on. To send this to any output topic you can use below code.

like image 20
Raghavendra Acharya Avatar answered Oct 12 '22 01:10

Raghavendra Acharya