I'm trying to reset consumer offset with latest CLI tools for Kafka.
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics
As a result I see this output:
TOPIC PARTITION NEW-OFFSET FirstTopic 0 0 SecondTopic 0 0
But running again command:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe
results in output:
Consumer group 'my-group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG FirstTopic 0 1230 1230 0 SecondTopic 0 1022 1022 0
I've tried other options like resetting to explicit offset or specifying the topic directly but result is same. The output suggests that operation succeed while checking offsets with describe command or debugging shows that offset has not be changed.
Anyone succeed with resetting consumer offset within non-zookeeper brokers.
Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
The default is 5 seconds. Second, use auto. offset. reset to define the behavior of the consumer when there is no committed position (which would be the case when the group is first initialized) or when an offset is out of range.
The auto offset reset consumer configuration defines how a consumer should behave when consuming from a topic partition when there is no initial offset. This is most typically of interest when a new consumer group has been defined and is listening to a topic for the first time.
Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset. kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute Consumer offset reset options
Use the kafka-consumer-groups along with the consumer group id followed by a describe. You will see 2 entries related to offsets – CURRENT-OFFSET and LOG-END-OFFSET for the partitions in the topic for that consumer group. CURRENT-OFFSET is the current offset for the partition in the consumer group.
Reset the consumer offset for a topic (execute) kafka-consumer-groups --bootstrap-server < kafkahost:port > --group < group_id > --topic < topic_name > --reset-offsets --to-earliest --execute This will execute the reset and reset the consumer group offset for the specified topic back to 0. Repeat 1 to check if the reset is successful
For example, if Kafka has a retention of 7 days, and your consumer is down for more than 7 days, the offsets are "invalid" as they will be deleted. In this case, consumers have a choice to either start reading from the beginning of the partition or from the end of the partition.
By default, --reset-offsets
just prints the result of the operation. To actually perform the operation you need to add --execute
to your command:
kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute
Though the accepted answer perfectly answers OP question, there are more parameters available to reset offsets. So adding this answer to extend accepted answer.
To reset offset of all topics to earliest in the consumer group
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-earliest --all-topics --execute
To reset offset of specific topic to earliest in the consumer group
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
To reset offset of specific topic to specific offset in the consumer group
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
Other supported arguments:
--shift-by [positive or negative integer] - Shifts offset forward or backward from given integer.
--to-current and --to-latest are same as --to-offset and --to-earliest.
--to-datetime [Datetime format is yyyy-MM-ddTHH:mm:ss.xxx]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
--by-duration [Format is PnDTnHnMnS]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group <group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
Reset to offset by duration from current timestamp.
How to validate?
Use below command to check current/end of offsets and to confirm reset made the cahnges.
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Sample output:
Consumer group 'group1' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID intro 0 0 99 99 - - -
Possible Errors:
Error: Assignments can only be reset if the group '[group_name]' is inactive, but the current state is Stable.
'Stable' means, there is an active consumer running for this group. So first you have to stop the active consumer(s) and retry resetting offsets.
It is not possible to reset offsets if there is an active consumer for the consumer group.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With