I need to consume specific offset to specific end offset!! consumer.seek() reads the data from specific offset but I need retrieve the data fromoffset to tooffset !! Any help will be appreciate , thanks in advance.
ConsumerRecords<String, String> records = consumer.poll(100);
if(flag) {
consumer.seek(new TopicPartition("topic-1", 0), 90);
flag = false;
}
To read messages from a start offset to an end offset, you first need to use seek()
to move the consumer at the desired starting location and then poll()
until you hit the desired end offset.
For example, to consume from offset 100 to 200:
String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Move to the desired start offset
consumer.seek(tp, 100L);
}
});
boolean run = true;
long lastOffset = 200L;
while (run) {
ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> record : crs) {
System.out.println(record);
if (record.offset() == lastOffset) {
// Reached the end offsey, stop consuming
run = false;
break;
}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With