I'm using Kafka Connect in distributed mode. A strange behavior I observed multiple times now is that, after some time (can be hours, can be days), what appears to be a balancing error happens: same tasks get assigned to multiple workers. As a result, they run concurrently and, depending on the nature of the connector, fail or produce "unpredictable" outputs.
The simplest configuration I was able to use to reproduce the behavior is: two Kafka Connect workers, two connectors, each connector with one task only. Kafka Connect is deployed into Kubernetes. Kafka itself is in Confluent Cloud. Both Kafka Connect and Kafka are of the same version (5.3.1).
Relevant messages from the log:
Worker A:
[2019-10-30 12:44:23,925] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,926] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-hdfs-sink, some-mqtt-source], taskIds=[some-hdfs-sink-0, some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
Worker B:
[2019-10-30 12:44:23,930] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Successfully joined group with generation 488 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:469)
[2019-10-30 12:44:23,936] INFO [Worker clientId=connect-1, groupId=some-kafka-connect-cluster] Joined group at generation 488 and got assignment: Assignment{error=0, leader='connect-1-d5c19893-b33c-4f07-85fb-db9736795759', leaderUrl='http://10.16.0.15:8083/', offset=250, connectorIds=[some-mqtt-source], taskIds=[some-mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
In above log extracts you can observe that same task (some-mqtt-source-0
) is assigned to two workers. After this message, I can also see log messages by the task instances on both workers.
This behavior doesn't depend on the connector (I observed it with other tasks as well). It also doesn't happen immediately after the workers are started, but after some time only.
My question is what can be the cause of this behavior?
EDIT 1: I've tried running 3 workers, instead of two, thinking that it might be a distributed consensus issue. It appears not to be, and having 3 workers doesn't fix the issue.
EDIT 2:
I've noticed that just before a worker A is assigned a task that originally ran on worker B, that worker (B) observes an error joining a group. For example, if tasks get "duplicated" in generation N, worker B would not have a "Successfully joined group with generation N" message in logs. More so, between generation N-1 and N+1, worker B typically logs errors like Attempt to heartbeat failed for since member id
and Group coordinator bx-xxx-xxxxx.europe-west1.gcp.confluent.cloud:9092 (id: 1234567890 rack: null) is unavailable or invalid
. Worker B typically joins the generation N+1 shortly after the generation N (sometimes in as few as just about 3 seconds later). It is now clear what triggers the behavior. However:
although I understand that there may be temporary issues like these and they are probably normal in a general case, why doesn't rebalancing fix the issue after all servers successfully join the next generation? Although more rebalnce follow - it doesn't correctly distribute the tasks, and keeps the "duplicates" forever (until I restart workers).
it appears that in some periods, rebalance happens almost once per several hours, and in other periods it happens every 5 minutes (precisely up to seconds); what could be the reason? what is the normal?
what could be the reason for "Group coordinator is unavailable or invalid" errors, given that I use a Confluent Cloud, and are there any configuration parameters that can be tweaked in Kafka Connect in order to make it more resilient with regards to this error? I know there are session.timeout.ms
and heartbeat.interval.ms
, but the documentation is so minimalist it is not even clear what is the practical impact of changing these parameters to smaller or bigger values.
EDIT 3: I observed that the issue is not critical for sink tasks: although same sink tasks get assigned to multiple workers, corresponding consumers are assigned to different partitions as they normally should, and everything works almost as it should - I simply got more tasks than I originally asked for. However, in case of source tasks, the behavior is breaking - tasks run concurrently and compete for resources on the source side.
EDIT 4: Meanwhile, I downgraded Kafka Connect to version 2.2 (Confluent Platform 5.2.3) - a pre-"Incremental Cooperative Rebalancing" version. It works fine for last 2 days. So, I assume the behavior is related to the new rebalancing mechanism.
tasks set in the configuration and the number of partitions of the Kafka topic it is reading from). The workers actually start the Tasks. Tasks are responsible for getting data into and out of Kafka (but only on the Source or Sink side, the Workers manage data flow to/from Kafka topics).
Tasks. Tasks are the main actor in the data model for Connect. Each connector instance coordinates a set of tasks that actually copy the data. By allowing the connector to break a single job into many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with very little configuration ...
Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.
The process is called a worker in Kafka Connect. There are two modes for running workers: Standalone mode: Useful for development and testing Kafka Connect on a local machine.
As mentioned in the comments, Jira Kafka-9184 was made to address this problem, and it has been resolved.
The fix is available in versions 2.3.2 and above.
As such the answer is now: upgrading to a recent version should prevent this problem from occurring.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With