I'm trying to make a practical design decision based on convention and plausibility with KafkaStreams.
Let's say that I have two different events that I want to place into KTables. I have a producer sending these messages to a KStream that is listening to that topic.
From what I can tell I cannot use conditional forwarding for messages using KafkaStreams, so if the stream is subscribe to many topics (one for each of the above messages, for example) I can only call stream.to on a single sink topic - otherwise, I would have to do something like call foreach on the stream and send messages with a KProducer to the sink topics.
The above suggests using a single stream. I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink, but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
Can I set up multiple streams in the same app? If so, are there any special requirements?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams = {
val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)
streams.start()
streams
}
}
class Streams() {
val eventStream = new Stream("first_event") //looking good!
val eventStream2 = new Stream("second_event") // no subscribers
//if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
val streams: KafkaStreams = eventStream.init()
val streams2: KafkaStreams = eventStream2.init()
}
stream config
val streamConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
properties
}
I'd also love any alternatives suggested
When you are creating your KafkaStreams you need to pass property with different application.id, for example:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
And then you should create another stream:
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
StreamsBuilder builder = new SteamsBuilder();
KStream stream2 = builder.stream("topic2");
KafkaStreams streams2 = new KafkaStreams(builder, props);
streams2.start();
From what I can tell I cannot use conditional forwarding for messages
Do you know about KStream#split() (KStream#branch() in order versions)? It basically the same as conditional forwarding.
I thought I could set up multiple streams in the same app, each listening to a topic, mapping and forwarding to a table sink,
This should work as follows:
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");
stream1.to("table1-topic");
stream2.to("table2-topic");
but everytime I try to create two instances of KafkaStreams, only the first initialized subscribes to its topic - the other gets a warning from the client that its topic has no subscriptions.
Not sure. This should work. Maybe you can share your code?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With