It is possible to change the start offset for a new topic? I would like to create a new topic and start reading from the offset 10000
. How?
How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.
Connect to each broker (from step 1), and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0 . Repeat for other partitions, and all replicas. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic. Start the Apache Kafka daemon for each stopped machine.
Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
Since kafka 0.11.0.0 you can use the script kafka-consumer-groups.sh
Example from this answer
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute
Other options listed in the KIP-122: Add Reset Consumer Group Offsets tooling
.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------. | Scenario | Arguments | Example | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm | Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset by Duration | --by-duration PnDTnHnMnS | Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Earliest | --to-earliest | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Latest | --to-latest | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Offset | --to-offset | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Shift Offset by 'n' | --shift-by n | Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5 | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset from File | --from-file PATH_TO_FILE | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv | '----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'
You can also define the partition you want to reset, example:
Reset offset of topic foo partition 0 to 1
--reset-offsets --group test.group --topic foo:0 --to-offset 1
Reset offset of topic foo partition 0,1,2 to earliest
--reset-offsets --group test.group --topic foo:0,1,2 --to-earliest
Reminder: don't forget the --execute
flag (see the execution options in the KIP). Without this flag the script will only print out the result of the scenario by scope, for example:
TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID foo 0 90 10 100 - - -
Credits to this answer. Table created with ascii tables
You can do this with the help of the zookeeper shell. Kafka uses zookeeper to track the consumer offsets.
Go to kafka bin directory and invoke zookeeper shell.(my kafka version is 0.8.0)
./zookeeper-shell.sh localhost:2181
Now use the zookeeper get command
get /consumers/consumer_group_id/offsets/topic/0
it shows something like
2043 cZxid = 0x4d ctime = Wed Mar 18 03:56:32 EDT 2015 ...
Here 2043 is the maximum offset consumed. Set it to desired value by using zookeeper set command
set /consumers/consumer_group_id/offsets/topic/0 10000
The path is framed like this /consumers/[consumer_group_id]/offsets/[topic]/[partition_id].
You will have to substitute with appropriate consumer group, topic and partition id.
*Also since you mentioned it's a new instance of kafka, I am not sure whether consumers would have connected and consumer groups created.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With