Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams: use the same `application.id` to consume from multiple topics

I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.

Error

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

Code (kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

I found this reference that suggest that you can not use application.id for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id states:

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

Questions

  1. What does this error mean, and what causes it.
  2. Given that you can have multiple instance of you app running with the same id to consume from multiple topic partitions, what does "Must be unique within the Kafka cluster" mean?
  3. Can you use the same Kafka streams application.id to start two KafkaStreams that are listing on different topics? and if so, how?

Details: kafka 0.11.0.2

like image 391
Mike Rylander Avatar asked Dec 27 '17 18:12

Mike Rylander


1 Answers

Kafka Streams scales via partitions, not topics. Thus, if you start multiple application with the same application.id they must be identical with regard to input topic they subscribe to and their processing logic. The application forms a consumer-group using the application.id as group.id and thus different partitions of the input topic(s) are assigned to different instances.

If you have different topic with the same logic, you can subscribe to all topic at once (in each instance you start). Scaling is still based on partitions though. (It's basically a "merge" of your input topics.)

If you want to scale via topics and/or have different processing logic, you must use different application.id for the different Kafka Streams applications.

like image 83
Matthias J. Sax Avatar answered Oct 19 '22 12:10

Matthias J. Sax