Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly restart a kafka s3 sink connect?

I started a kafka s3 sink connector (bundle connector from confluent package) since 1 May. It works fine until 8 May. Checking the status, it tells that some aws exception crashes this connector. This should not be a big problem, so I want to restore it.

I tried the following steps:

  1. I POST /connectors/s3sink/restart . Then I saw the connector is in RUNNING mode, but the task is still FAIL.
  2. Then I PUT /connectors/s3sink/task/0/restart. Ok, now the task is in RUNNING mode.

But then I tail the log, I found it starts to rewrite the old data, such as 3 May data. And it messed the old data!

So, does connect restart REST API reset the offset? I thought it will save the offset and just start from the offset it fails.

And how to restart a failed connector task correctly? By deleting those PODs? (using kubernetes), or by REST /task/0/restart? When should I use /connectors/s3sink/restart?

like image 750
Xiang Zhang Avatar asked Jan 28 '23 03:01

Xiang Zhang


1 Answers

/connector/:name/restart is a rolling restart operation on the worker leader that needs to propagate to all worker server tasks in async fashion. So, you need to ensure network connection between the leader worker and all others.

/connector/:name/task/:num/restart will send request straight to that worker, restarting the thread.

Restart should not reset the offset since they are stored in the consumer offsets topic for that connect cluster. If anything, the tasks were not able to commit offsets back to the __consumer_offsets topic, but you should see logs for that.

like image 70
OneCricketeer Avatar answered Feb 05 '23 18:02

OneCricketeer