Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring kafka consumer, seek offset at runtime?

I am using the KafkaMessageListenerContainer for consuming from the kafka topic, I have an application logic to process each record which is dependent on other micro services as well. I am now manually committing the offset after each record is processed.

But if I the application logic fails I need to seek to the failed offset and keep processing it until it's succeeds. For that I need to do a run time manual seek of the last offset.

Is this possible with the KafkaMessageListenerContainer yet ?

like image 248
sash Avatar asked Mar 24 '17 10:03

sash


People also ask

How Kafka consumer maintains offset?

As each message is received by Kafka, it allocates a message ID to the message. Kafka then maintains the message ID offset on a by consumer and by partition basis to track consumption. Kafka brokers keep track of both what is sent to the consumer and what is acknowledged by the consumer by using two offset values.

Where does Kafka store offsets for consumers?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.

Is Kafka offset continuous?

If the offset is committed successfully, after the consumer restarts, it can continue consuming from the committed offset. Kafka's offset is continuous as it follows the following constraints: The first message's offset is 0. If the latest message's offset is N, then the next message's offset will be N+1.


1 Answers

See Seeking to a Specific Offset.

In order to seek, your listener must implement ConsumerSeekAware which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

like image 152
Gary Russell Avatar answered Oct 12 '22 00:10

Gary Russell