I have a Kafka Streams app that connects to our Kafka cluster using the Kafka Streams DSL, like so:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
And another part of my code base that establishes a connection to our cluster using the consumer client directly.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
The reason I am doing this is to gather meta data about the consumer group before conditionally kicking off other parts of the app (which includes the Kafka Streams topology). There are probably other ways to do this (e.g. through various hooks or what not), but I am more curious about why the intermixing of these methods will sometimes (intermittently) lead to a InconsistentGroupProtocolException
being thrown.
Could someone please shed some light on why this is being thrown? I'm having a difficult time determining what exactly is going on from the source code itself, but I guess the underlying consumers that are constructed by Kafka Streams are specifying a different partitioning protocol than the KafkaConsumer
client. Anyways, any help in understanding this exception will be greatly appreciated
In such case you need to assign every topic evenly to all consumers. Due to this assignor treating every topic separately, you can expect that all consumers consume from all of the topics. In different assignors, one consumer might end up having all partitions from one topic and no partition from the others.
The RangeAssignor is the default strategy. The aims of this strategy is to co-localized partitions of several topics. This is useful, for example, to join records from two topics which have the same number of partitions and the same key-partitioning logic.
Consumer partition assignmentWhenever a consumer enters or leaves a consumer group, the brokers rebalance the partitions across consumers, meaning Kafka handles load balancing with respect to the number of partitions per application instance for you. This is great—it's a major feature of Kafka.
You put the answer yourself. Kafka Streams uses a custom partition assigner and a Kafka Streams client only works with other Kafka Streams clients. If you use a KafkaConsumer
with the same group ID as your Kafka Streams app, it will fail to fence off KafkaConsumer
s to join the Kafka Streams consumer group. Obviously, KafkaConsumer
cannot "play" with Kafka Streams.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With