I have searched quite a lot on this but there doesn't seems to be a good guide around this.
From what I have searched there are a few things to consider:
Question: Is there even a need to reset these topics?
--reset-offsets
to --to-earliest
What would be the best way to restart both a sink and a source connector to read from beginning?
The Kafka Connect REST API includes two endpoints to restart a connector and a task: POST /connectors/{connector-name}/restart. POST /connectors/{connector-name}/tasks/{task-id}/restart Sending a request to one of these endpoints will cause the Connect framework to restart the connector/task.
Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range. You can choose either to reset the position to the “earliest” offset or the “latest” offset (the default).
Source connector Distributed mode - has another option which is producing a new message to the offset topic. For example I use jdbc source connector: When looking on the offset topic I see the following:
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true
["referrer-family-jdbc-source",{"query":"query"}] {"incrementing":100}
Now in order to reset this I just produce another message with incrementing:0
For example: how to produce from shell with key from here
./kafka-console-producer.sh \
--broker-list `hostname`:9092 \
--topic kc-staging--offsets \
--property "parse.key=true" \
--property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}
Please note that you need to do the following:
Source Connector:
/tmp/connect.offsets
) or change connector name.Sink Connector (both modes) one of the following methods:
To reset offset you have to first delete connector, reset offset (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName
), add same configuration one more time
You can check following question: Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With