Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Restart kafka connect sink and source connectors to read from beginning

I have searched quite a lot on this but there doesn't seems to be a good guide around this.

From what I have searched there are a few things to consider:

  • Resetting Sink Connector internal topics (status, config and offset).
  • Source Connector offsets implementation is implementation specific.

Question: Is there even a need to reset these topics?

  • Deleting the consumer group.
  • Restarting the connector with a different name (this is also an option) but it doesn't seems to be the right thing to do.
  • Resetting consumer group to --reset-offsets to --to-earliest
  • Using the REST API (Does the it provides the functionality to reset and read from beginning)

What would be the best way to restart both a sink and a source connector to read from beginning?

like image 316
el323 Avatar asked Mar 27 '19 12:03

el323


People also ask

How do I restart Kafka connect connector?

The Kafka Connect REST API includes two endpoints to restart a connector and a task: POST /connectors/{connector-name}/restart. POST /connectors/{connector-name}/tasks/{task-id}/restart Sending a request to one of these endpoints will cause the Connect framework to restart the connector/task.

What is auto offset reset in Kafka?

Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default).


2 Answers

Source connector Distributed mode - has another option which is producing a new message to the offset topic. For example I use jdbc source connector: When looking on the offset topic I see the following:

./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true

["referrer-family-jdbc-source",{"query":"query"}]   {"incrementing":100}

Now in order to reset this I just produce another message with incrementing:0

For example: how to produce from shell with key from here

./kafka-console-producer.sh \
  --broker-list `hostname`:9092 \
  --topic kc-staging--offsets \
  --property "parse.key=true" \
  --property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}

Please note that you need to do the following:

  • Delete the connector.
  • Produce a message with the relevant offset as I described above.
  • Create the connector again.
like image 195
Ehud Lev Avatar answered Sep 22 '22 14:09

Ehud Lev


Source Connector:

  • Standalone mode: remove offset file (/tmp/connect.offsets) or change connector name.
  • Distributed mode: change name of the connector.

Sink Connector (both modes) one of the following methods:

  • Change name.
  • Reset offset for the Consumer group. Name of the group is same as Connector name.

To reset offset you have to first delete connector, reset offset (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName), add same configuration one more time

You can check following question: Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?

like image 25
Bartosz Wardziński Avatar answered Sep 20 '22 14:09

Bartosz Wardziński