Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Second and Third Distributed Kafka Connector workers failing to work correctly

With a Kafka cluster of 3 and a Zookeeper cluster of the same I brought up one distributed connector node. This node ran successfully with a single task. I then brought up a second connector, this seemed to run as some of the code in the task definitely ran. However it then didn't seem to stay alive (though with no errors thrown, the not staying alive was observed by a lack of expected activity, while the first connector continued to function correctly). When I call the URL http://localhost:8083/connectors/mqtt/tasks, on each connector node, it tells me the connector has one task. I would expect this to be two tasks, one for each node/worker. (Currently the worker configuration says tasks.max = 1 but I've also tried setting it to 3.

When I try and bring up a third connector, I get the error:

"POST /connectors HTTP/1.1" 500 90  5 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

ERROR IO error forwarding REST request: 
(org.apache.kafka.connect.runtime.rest.RestServer:241) 
java.net.ConnectException: Connection refused

Trying to call the connector POST method again from the shell returns the error:

 {"error_code":500,"message":"IO Error trying to forward REST request:
 Connection refused"}

I also tried upgrading to Apache Kafka 0.10.1.1 that was released today. I'm still seeing the problems. The connectors are each running on isolated Docker containers defined by a single image. They should be identical.

The problem could be that I'm trying to run the POST request to http://localhost:8083/connectors on each worker, when I only need to run it once on a single worker and then the tasks for that connector will automatically distribute to the other workers. If this is the case, how do I get the tasks to distribute? I currently have the max set to three, but only one appears to be running on a single worker.

Update

I ultimately got things running using essentially the same approach that Yuri suggested. I gave each worker a unique group ID, then gave each connector task the same name. This allowed the three connectors and their single tasks to share a single offset, so that in the case of sink connectors the messages they consumed from Kafka were not duplicated. They are basically running as standalone connectors since the workers have different group ids and thus won't communicate with each other.

If the connector workers have the same group ID, you can't add more than one connector with the same name. If you give the connectors different names, they will have different offsets and consume duplicate messages. If you have three workers in the same group, one connector and three tasks, you would theoretically have an ideal situation where the tasks share an offset and the workers make sure the tasks are always running and well distributed (with each task consuming a unique set of partitions). In practice the connector framework doesn't create more than one task, even with tasks.max set to 3 and when the topic tasks are consuming has 25 partitions.

If anyone knows why I'm seeing this behaviour, please let me know.

like image 806
LaserJesus Avatar asked Dec 21 '16 01:12

LaserJesus


2 Answers

I'm posting an answer to an old question, since Kafka Connect has moved on a lot in three years.

In the latest version (2.3.1) there is incremental rebalancing which massively improves the behaviour of Kafka Connect.

It's also worth noting that when configuring Kafka Connect rest.advertised.host.name must be set correctly, as if it's not you will see errors including the one quoted

{"error_code":500,"message":"IO Error trying to forward REST request: Connection refused"}

See this post for more details.

like image 193
Robin Moffatt Avatar answered Nov 07 '22 20:11

Robin Moffatt


I've encountered with similar issue in the same situation as yours.

  1. Task.max is configured for a topic and distributed workers automatically decide what nodes handle topic. So, if you have 3 workers in a cluster and your topic configuration says task.max=2 then only 2 of 3 workers will process the topic. In theory, if one of workers fails, 3rd should pick up workload. But..
  2. The distributed connector turned out to be very unreliable: once you add\remove some nodes, the cluster broke down and all workers did nothing but tried to choose leader and failed. The only way to fix was to restart whole cluster and preferably all workers simultaneously.

I chose another way - I used standalone worker and it works like a charm to me because distribution of load is implemented on Kafka client level and once some worker dropped, the cluster re-balances automatically and clients connected to unoccupied topics.

PS. Maybe it will be useful for you too. Confluent connector is not tolerate to invalid payload that does not match topic's schema. Once the connector get some invalid message it silently dies. The only way to find out is to analyze metrics.

like image 34
Yuriy Tseretyan Avatar answered Nov 07 '22 20:11

Yuriy Tseretyan