Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to stop Kafka consumer at a specific offset?

Tags:

apache-kafka

I can seek to a specific offset. Is there a way to stop the consumer at a specific offset? In other words, consume till my given offset. As far as I know, Kafka does not offer such a function. Please correct me if I am wrong.

Eg. partition has offsets 1-10. I only want to consume from 3-8. After consuming the 8th message, program should exit.

like image 348
moon Avatar asked Oct 29 '22 02:10

moon


1 Answers

Yes, kafka does not offer this function, but you could achieve this in your consumer code. You could try use commitSync() to control this.

public void commitSync(Map offsets)

Commit the specified offsets for the specified list of topics and partitions. This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).

Something like this:

 while (goAhead) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         if (record.offset() > OFFSET_BOUND) {
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
            goAhead = false;
            break;           
         }
         process(record);
     }
 }

You should set the "enable.auto.commit" to false in code above. In your case the OFFSET_BOUND could be set to 8. Because the commited offset is just 9 in your example, So next time the consumer will fetch from this position.

like image 182
GuangshengZuo Avatar answered Jan 02 '23 20:01

GuangshengZuo