I want to reset offsets of all partitions to specific values .... I see kafka-consumer-groups.sh provides option of --from-file Reset offsets to values defined in CSV file
Can anyone please share contents/format of this csv file and example command for it ?
for example:
./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute
Whats contents/format of offsets.csv ?
Csv file format is (Each line contains information about one partition):
topicName,partitionNumber,offset
topicName,partitionNumber,offset
Sample csv content (reset-policy.csv
).
someTopic1,0,1
someTopic2,1,5
Command to reset offset based on csv file is:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute
@wardzinski's answer is the key information requested, but I can add the following useful tidbit:
You can use the --export
command of kafka-consumer-groups
to create the CSV file from existing information, without changing anything via --dry-run
. For example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run
The value --to-current
can be changed to various other values, such as --to-datetime
, --by-period
, etc.
The output of that command is the required CSV file necessary for --from-file
.
One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $FROM_GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run > offsets.txt
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--execute --group $TO_GROUP_NAME \
--reset-offsets --from-file offsets.txt
In case you plan to alter the offset within a Java application, you can make use of the AdminClient's API alterConsumerGroupOffsets
.
Here is a simple example tested with Kafka 2.8.0:
String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;
Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
// Check offsets before altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());
// Alter offsets
adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();
// Check offsets after altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("After: " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
This will print out
Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
After: {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}
You can extend that example to load a csv file that contains all information on consumer group, topic, partition and the new offset for that partition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With