Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Connect Distributed mode The group coordinator is not available

I have been trying this for two weeks now, I am running Kafka cluster on separate machines than my connect nodes. I am unable to get connect running properly. I can read and write to kafka no issue. Zookeeper seems to be running fine.

I launch connect:

$ bin/connect-distributed connect-distributed.properties

Connect keeps looping through this error:

[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)

Here is what my connect-distributed.properties looks like:

bootstrap.servers=172.25.1.2:9092,172.25.1.3:9092,172.25.1.4:9092
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=172.25.1.5
rest.port=8083

heartbeat.interval.ms=3000
session.timeout.ms=30000
security.protocol=PLAINTEXT
client.id=c1

plugin.path=/usr/share/java

__Consumer_offsets topic looks like this:

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets                                       
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
    Topic: __consumer_offsets       Partition: 0    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 1    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 3,2
    Topic: __consumer_offsets       Partition: 3    Leader: 1       Replicas: 1     Isr: 1
    Topic: __consumer_offsets       Partition: 4    Leader: 2       Replicas: 2     Isr: 2.... etc
like image 599
ldrrp Avatar asked Aug 21 '18 15:08

ldrrp


People also ask

What is group coordinator in Kafka?

Group Coordinator The coordinator uses an internal Kafka topic to keep track of group metadata. In a typical Kafka cluster, there will be multiple group coordinators.

What is distributed mode in Kafka?

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they automatically coordinate to schedule execution of connectors and tasks across all available workers.

What is __ Consumer_offsets topic in Kafka?

__consumer_offsets is the topic where Apache Kafka stores the offsets. Since the time Kafka migrated the offset storage from Zookeeper to avoid scalability problems __consumer_offsets is the one topic took the center stage in managing the offsets for all the consumers.


2 Answers

After writing a connector in Go, I came across the same issue. I was forced to solve it myself.

When a connector connects to kafka it automatically writes to the topics and to __offset_topics. When a connector crashes, it leaves trace of itself in those tables as the coordinator. When a new connector starts up it finds the record in the table and attempts to communicate with the coordinator. The coordinator fails to respond and the connector never works.

You can fix this one of two ways, Delete all the topics (connect-configs, connect-offsets, connect-status, __offset_topics) and restart the cluster. The other method is to remove the coordinator from the topics, which I am currently unsure how to perform.

like image 160
ldrrp Avatar answered Jan 02 '23 22:01

ldrrp


Posting for others who might find it useful.

I had the same problem... and restarting kafka solved the problem in my case.

After executing:

service kafka status

Made my log to be fine in less than 10 seconds:

2019-11-08 14:30:19.781  INFO [-,,,] 1 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=datasources-ca-contacts] Discovered group coordinator myserver:9092 (id: 2147483647 rack: null)
like image 38
Federico Piazza Avatar answered Jan 02 '23 23:01

Federico Piazza