Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaStreams multiple streams in same application

I'm trying to make a practical design decision based on convention and plausibility with KafkaStreams.

Let's say that I have two different events that I want to place into KTables. I have a producer sending these messages to a KStream that is listening to that topic.

From what I can tell I cannot use conditional forwarding for messages using KafkaStreams, so if the stream is subscribe to many topics (one for each of the above messages, for example) I can only call stream.to on a single sink topic - otherwise, I would have to do something like call foreach on the stream and send messages with a KProducer to the sink topics.

The above suggests using a single stream. I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink, but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

Can I set up multiple streams in the same app? If so, are there any special requirements?

    class Stream(topic: String) {
      val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
      val streamsBuilder = new StreamsBuilder
      val topics = new util.ArrayList[String]
      topics.add(props.get("topic"))

      val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

      def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
        builder.stream[String, String](
          topics,
          Consumed.`with`(String(), String())
        )
      }

      def init(): KafkaStreams = {
        val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

        streams.start()

        streams
      }
    }

    class Streams() {

      val eventStream = new Stream("first_event") //looking good!
      val eventStream2 = new Stream("second_event") // no subscribers
      //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
      val streams: KafkaStreams = eventStream.init()
      val streams2: KafkaStreams = eventStream2.init()

    }

stream config

    val streamConfig: Properties = {
        val properties = new Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
        properties
    }

I'd also love any alternatives suggested

like image 929
nbpeth Avatar asked Jun 05 '26 21:06

nbpeth


2 Answers

When you are creating your KafkaStreams you need to pass property with different application.id, for example:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream1 = builder.stream("topic1");
    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

And then you should create another stream:

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
    StreamsBuilder builder = new SteamsBuilder();
    KStream stream2 = builder.stream("topic2");
    KafkaStreams streams2 = new KafkaStreams(builder, props);
    streams2.start();
like image 114
Bohdan Myslyvchuk Avatar answered Jun 07 '26 23:06

Bohdan Myslyvchuk


From what I can tell I cannot use conditional forwarding for messages

Do you know about KStream#split() (KStream#branch() in order versions)? It basically the same as conditional forwarding.

I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,

This should work as follows:

StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");

stream1.to("table1-topic");
stream2.to("table2-topic");

but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.

Not sure. This should work. Maybe you can share your code?

like image 40
Matthias J. Sax Avatar answered Jun 08 '26 00:06

Matthias J. Sax



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!