I have create a sink kafka connect that convert data to other storage; I want to set auto.offset.reset as latest when new connector is created with kafka connect rest api; I have set consumer.auto.offset.reset: latest in configs;
json
{
"name": "test_v14",
"config": {
"name": "test_v14",
"consumer.auto.offset.reset": "latest",
"connector.class": "...",
...
}
}
But when task started, kafka consumer still poll records from earliest; So is any other ways to set auto.offset.reset as latest;
Prior to Kafka 2.3
consumer.auto.offset.reset needs to be set in the connect-distributed.properties file (the Worker).
It cannot be applied to any particular Connector unless that connector class is explicitly creating and loading its own Consumer objects that read in that property.
As of Apache Kafka 2.3, it is now possible to set this as part of a connector's configuration.
On the worker set:
connector.client.config.override.policy=All
Then in the connector you can specify
"consumer.override.auto.offset.reset": "latest"
See this for more details: https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With