Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

The group member's supported protocols are incompatible with those of existing members

I'm facing an issue related to Kafka.

I'm having my current service (Producer) that sends the message to a Kafka topic (events). The service is using kafka_2.12 v1.0.0, written in Java.

I'm trying to integrate it with the sample project of spark-streaming as a Consumer service (here using kafka_2.11 v0.10.0, written in Scala)

The message is sent successfully from Producer to the Kafka topic. However, I always receive the error stack below:

Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
    at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571)    at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
    at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)     at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)   at scala.App$class.main(App.scala:76)
    at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
    at com.jj.streaming.ItemApp.main(ItemApp.scala)

I don't know the root cause. How can I fix this?

like image 892
Tin Nguyen Avatar asked Jul 23 '18 20:07

Tin Nguyen


1 Answers

This happens in my configuration when I attempt to add a consumer to the cluster that is using a different partition assignment strategy from the previous ones.

For example:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor

mixed with or defaulted to:

partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor

like image 197
John Cairns Avatar answered Oct 19 '22 01:10

John Cairns