Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KafkaStreams - InconsistentGroupProtocolException

I have a Kafka Streams app that connects to our Kafka cluster using the Kafka Streams DSL, like so:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

And another part of my code base that establishes a connection to our cluster using the consumer client directly.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

The reason I am doing this is to gather meta data about the consumer group before conditionally kicking off other parts of the app (which includes the Kafka Streams topology). There are probably other ways to do this (e.g. through various hooks or what not), but I am more curious about why the intermixing of these methods will sometimes (intermittently) lead to a InconsistentGroupProtocolException being thrown.

Could someone please shed some light on why this is being thrown? I'm having a difficult time determining what exactly is going on from the source code itself, but I guess the underlying consumers that are constructed by Kafka Streams are specifying a different partitioning protocol than the KafkaConsumer client. Anyways, any help in understanding this exception will be greatly appreciated

like image 947
foxygen Avatar asked Jan 11 '17 22:01

foxygen


People also ask

How to assign partition to consumer?

In such case you need to assign every topic evenly to all consumers. Due to this assignor treating every topic separately, you can expect that all consumers consume from all of the topics. In different assignors, one consumer might end up having all partitions from one topic and no partition from the others.

What is default partition assignment strategy?

The RangeAssignor is the default strategy. The aims of this strategy is to co-localized partitions of several topics. This is useful, for example, to join records from two topics which have the same number of partitions and the same key-partitioning logic.

How do partitions get assigned in Kafka?

Consumer partition assignmentWhenever a consumer enters or leaves a consumer group, the brokers rebalance the partitions across consumers, meaning Kafka handles load balancing with respect to the number of partitions per application instance for you. This is great—it's a major feature of Kafka.


1 Answers

You put the answer yourself. Kafka Streams uses a custom partition assigner and a Kafka Streams client only works with other Kafka Streams clients. If you use a KafkaConsumer with the same group ID as your Kafka Streams app, it will fail to fence off KafkaConsumers to join the Kafka Streams consumer group. Obviously, KafkaConsumer cannot "play" with Kafka Streams.

like image 138
Matthias J. Sax Avatar answered Oct 27 '22 06:10

Matthias J. Sax