I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.
Error
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
Code (kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
I found this reference that suggest that you can not use application.id
for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id
states:
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
Questions
application.id
to start two KafkaStreams
that are listing on different topics? and if so, how?Details: kafka 0.11.0.2
Kafka Streams scales via partitions, not topics. Thus, if you start multiple application with the same application.id
they must be identical with regard to input topic they subscribe to and their processing logic. The application forms a consumer-group using the application.id
as group.id
and thus different partitions of the input topic(s) are assigned to different instances.
If you have different topic with the same logic, you can subscribe to all topic at once (in each instance you start). Scaling is still based on partitions though. (It's basically a "merge" of your input topics.)
If you want to scale via topics and/or have different processing logic, you must use different application.id
for the different Kafka Streams applications.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With