How do I get, set, or reset the offset of a Kafka Connect connector/task/sink?
I can use the /usr/bin/kafka-consumer-groups
tool which runs kafka.admin.ConsumerGroupCommand
to see the offsets for all my regular Kafka consumer groups. However, Kafka Connect tasks and groups do not show up with this tool.
Similarly, I can use the zookeeper-shell to connect to Zookeeper and I can see zookeeper entries for regular Kafka consumer groups, but not for Kafka Connect sinks.
How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
You can use the REST API to view the current status of a connector and its tasks, including the ID of the worker to which each was assigned. Connectors and their tasks publish status updates to a shared topic (configured with status. storage. topic ) which all workers in the cluster monitor.
As of 0.10.0.0, Connect doesn't provide an API for managing offsets. It's something we want to improve in the future, but not there yet. The ConsumerGroupCommand
would be the right tool to manage offsets for Sink connectors. Note that source connector offsets are stored in a special offsets topic for Connect (they aren't like normal Kafka offsets since they are defined by the source system, see offset.storage.topic
in the worker configuration docs) and since sink Connectors uses the new consumer, they won't store their offsets in Zookeeper -- all modern clients use native Kafka-based offset storage. The ConsumerGroupCommand
can work with these offsets, you just need to pass the --new-consumer
option).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With