I have switched off auto commit and not committing offset also from consumer after reading.
Checked consumer lag is also remaining same, it ensures that offset is not getting committed. But the problem is, it is consuming next msg not the same message again.
How can I keep reading same message again and again. I should be able to read next message only if previous offset has been committed. Please help me here doing this.
If you know which partition your kafka consumer is currently accessing, you can use kafkaconsumer.seek(partition, offset)
method to tell your consumer to read message from a particular offset.
Example:
//to get the partition from consumer record
val partition: Int = consumerRecord.partition()
//to get offset of current record
val recordOffset = consumerRecord.offset()
if(data processing fail condition)
consumer.seek(new TopicPartition(topic, partition), recordOffset )
//will return records from recordOffset now, if data processing fail condition was true
consumer.poll(100)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With