I have a single kafka consumer which is connected to a topic with 3 partitions. As soon as I get a record from kafka, I would like to capture the offset and partition. On restart I would like to restore the position of the consumer from the last read offset
From kafka documentation:
Each record comes with its own offset, so to manage your own offset you just need to do the following:
Configure enable.auto.commit=false
Use the offset provided with each ConsumerRecord to save your position.
On restart restore the position of the consumer using seek (TopicPartition, long).
Here is my sample code:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
Is this the right way to do? is there any better way?
If you commit your offsets Kafka will store them for you (for up to 24 hours by default).
That way if your consumer dies you could start the same code on another machine and continue right from where you left off. No external storage needed.
See "Offsets and Consumer Position" in https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
and recommend you consider to use commitSync
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With