Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Restart from same offset

I have a single kafka consumer which is connected to a topic with 3 partitions. As soon as I get a record from kafka, I would like to capture the offset and partition. On restart I would like to restore the position of the consumer from the last read offset

From kafka documentation:

Each record comes with its own offset, so to manage your own offset you just need to do the following:

Configure enable.auto.commit=false

Use the offset provided with each ConsumerRecord to save your position.

On restart restore the position of the consumer using seek (TopicPartition, long).

Here is my sample code:

constructor{    
    load data into offsetMap<partition,offset>
    initFlag=true;
}

Main method
{
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(initFlag) // is this correct way to override offset position?
    {
        seekToPositions(offsetMap); 
        initFlag=false;
    }
    while(!shutdown)
    {
        for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                getOffsetPositions();// dump offsets and partitions to db/disk
        }   
   }
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

    Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
    //code to put partition and offset into map
    //write to disk or db

    }
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
            //code get partitions and offset from offsetMap
            consumer.seek(partition, offset);

    }

Is this the right way to do? is there any better way?

like image 702
Manuj Kathuria Avatar asked Sep 03 '25 15:09

Manuj Kathuria


1 Answers

If you commit your offsets Kafka will store them for you (for up to 24 hours by default).

That way if your consumer dies you could start the same code on another machine and continue right from where you left off. No external storage needed.

See "Offsets and Consumer Position" in https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

and recommend you consider to use commitSync

like image 146
Hans Jespersen Avatar answered Sep 05 '25 04:09

Hans Jespersen