Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to change start offset for topic?

It is possible to change the start offset for a new topic? I would like to create a new topic and start reading from the offset 10000. How?

like image 516
Piotr Sobolewski Avatar asked Apr 22 '15 08:04

Piotr Sobolewski


People also ask

How do I set Kafka topic offset?

How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.

What is Kafka start offset?

OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.

How do I reset a Kafka topic?

Connect to each broker (from step 1), and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0 . Repeat for other partitions, and all replicas. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic. Start the Apache Kafka daemon for each stopped machine.

How do you manually commit specific offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.


2 Answers

Since kafka 0.11.0.0 you can use the script kafka-consumer-groups.sh Example from this answer

kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute 

Other options listed in the KIP-122: Add Reset Consumer Group Offsets tooling

.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------. |      Scenario        |                   Arguments                   |                                                                    Example                                                                   | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Datetime    |  --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm  |  Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset by Duration    |  --by-duration  PnDTnHnMnS                    |  Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D         | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Earliest    |  --to-earliest                                |  Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest                                            | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Latest      |  --to-latest                                  |  Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest                                                | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset to Offset      |  --to-offset                                  |  Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1                                           | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Shift Offset by 'n'  |  --shift-by n                                 |  Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5                                        | :----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------: | Reset from File      |  --from-file PATH_TO_FILE                     |  Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv                                           | '----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------' 

You can also define the partition you want to reset, example:

  • Reset offset of topic foo partition 0 to 1

    --reset-offsets --group test.group --topic foo:0 --to-offset 1

  • Reset offset of topic foo partition 0,1,2 to earliest

    --reset-offsets --group test.group --topic foo:0,1,2 --to-earliest

Reminder: don't forget the --execute flag (see the execution options in the KIP). Without this flag the script will only print out the result of the scenario by scope, for example:

TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID foo                   0         90         10      100            -           -    - 

Credits to this answer. Table created with ascii tables

like image 68
gabrielgiussi Avatar answered Oct 09 '22 05:10

gabrielgiussi


You can do this with the help of the zookeeper shell. Kafka uses zookeeper to track the consumer offsets.

Go to kafka bin directory and invoke zookeeper shell.(my kafka version is 0.8.0)

./zookeeper-shell.sh localhost:2181 

Now use the zookeeper get command

get /consumers/consumer_group_id/offsets/topic/0 

it shows something like

2043 cZxid = 0x4d ctime = Wed Mar 18 03:56:32 EDT 2015 ... 

Here 2043 is the maximum offset consumed. Set it to desired value by using zookeeper set command

set /consumers/consumer_group_id/offsets/topic/0 10000 

The path is framed like this /consumers/[consumer_group_id]/offsets/[topic]/[partition_id].
You will have to substitute with appropriate consumer group, topic and partition id.

*Also since you mentioned it's a new instance of kafka, I am not sure whether consumers would have connected and consumer groups created.

like image 43
Vishal John Avatar answered Oct 09 '22 03:10

Vishal John