Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer: Want to read same message again if not committed previous messages offset and auto commit is disabled

I have switched off auto commit and not committing offset also from consumer after reading.

Checked consumer lag is also remaining same, it ensures that offset is not getting committed. But the problem is, it is consuming next msg not the same message again.

How can I keep reading same message again and again. I should be able to read next message only if previous offset has been committed. Please help me here doing this.

like image 344
Rahul Avatar asked Dec 14 '22 14:12

Rahul


1 Answers

If you know which partition your kafka consumer is currently accessing, you can use kafkaconsumer.seek(partition, offset) method to tell your consumer to read message from a particular offset. Example:

//to get the partition from consumer record
val partition: Int = consumerRecord.partition() 

//to get offset of current record
val recordOffset = consumerRecord.offset() 

if(data processing fail condition)
  consumer.seek(new TopicPartition(topic, partition), recordOffset )

//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100) 
like image 88
ShailyAggarwal Avatar answered May 18 '23 19:05

ShailyAggarwal