Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reset offsets to arbitrary value in Kafka Consumer Group?

Tags:

apache-kafka

I want to reset offsets of all partitions to specific values .... I see kafka-consumer-groups.sh provides option of --from-file Reset offsets to values defined in CSV file

Can anyone please share contents/format of this csv file and example command for it ?

for example: ./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute

Whats contents/format of offsets.csv ?

like image 966
mzlo Avatar asked Apr 02 '19 14:04

mzlo


3 Answers

Csv file format is (Each line contains information about one partition):

topicName,partitionNumber,offset
topicName,partitionNumber,offset

Sample csv content (reset-policy.csv).

someTopic1,0,1
someTopic2,1,5

Command to reset offset based on csv file is:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute

like image 83
Bartosz Wardziński Avatar answered Nov 08 '22 05:11

Bartosz Wardziński


@wardzinski's answer is the key information requested, but I can add the following useful tidbit:

You can use the --export command of kafka-consumer-groups to create the CSV file from existing information, without changing anything via --dry-run. For example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run

The value --to-current can be changed to various other values, such as --to-datetime, --by-period, etc.

The output of that command is the required CSV file necessary for --from-file.

One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $FROM_GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run > offsets.txt

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --execute --group $TO_GROUP_NAME \
  --reset-offsets --from-file offsets.txt
like image 41
Raman Avatar answered Nov 08 '22 05:11

Raman


In case you plan to alter the offset within a Java application, you can make use of the AdminClient's API alterConsumerGroupOffsets.

Here is a simple example tested with Kafka 2.8.0:

String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;

Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  // Check offsets before altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());

  // Alter offsets
  adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();

  // Check offsets after altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("After:  " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
} finally {
  adminClient.close();
}

This will print out

Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
After:  {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}

You can extend that example to load a csv file that contains all information on consumer group, topic, partition and the new offset for that partition.

like image 1
Michael Heil Avatar answered Nov 08 '22 05:11

Michael Heil