Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does an offset expire for an Apache Kafka consumer group?

Tags:

apache-kafka

I was making some tests on an old topic when I noticed some strange behaviours. Reading Kafka's log I noticed this "removed 8 expired offsets" message:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator) Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log) Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log) Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log) Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log) Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log) Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex) Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log) Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log) Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex) Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex) [GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator) [GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator) [Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) 

In fact, I have 2 questions:

  1. How does this offset expiration work for a consumer group?

  2. Can this expired offset explain this behaviour where my consumer would not poll anything when it had auto.offset.reset = latest, but it polled from the last committed offset when it had auto.offset.reset = earliest ?

like image 468
Enzo Avatar asked Aug 24 '16 19:08

Enzo


People also ask

How does Kafka consumer maintain offset?

each consumer group maintains its offset per topic partition. if you need multiple subscribers, then you have multiple consumer groups. a record gets delivered to only one consumer in a consumer group. each consumer in a consumer group processes records and only one consumer in that group will get the same record.

Do Kafka consumer groups expire?

With the new consumer API, the broker handles everything including metadata deletion: the group is deleted automatically when the last committed offset for the group expires.

Where does Kafka store offsets for consumers?

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.


2 Answers

Update

Since Apache Kafka 2.1, offsets won't be deleted as long as the consumer group is active, independent if the consumers commit offsets or not, ie, the offset.retention.minutes clocks only starts to tick when the group becomes empty (in older released, the clock started to tick directly when the commit happened).

Cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

Original Answer

Kafka, by default deletes committed offsets after a configurable period of time. See parameter offsets.retention.minutes. Ie, if a consumer group is inactive (ie, does not commit any offsets) for this amount of time, the offsets get deleted. Thus, even if the consumer is running, if it does not commit offsets for some partitions, those offsets are subject to offset.retention.minutes.

If you start a consumer, the following happens:

  1. look for a (valid) committed offset (for the consumer group)
    1. if valid offset is found, resume from there
    2. if no valid offset is found, reset offset according to auto.offset.reset parameter

Thus, if your offsets got deleted and auto.offset.reset = latest, you consumer will not poll anything until new data is added to the topic. If auto.offset.reset = earliest it should consume the whole topic.

See this JIRA for a discussion about this https://issues.apache.org/jira/browse/KAFKA-3806 and https://issues.apache.org/jira/browse/KAFKA-4682

like image 113
Matthias J. Sax Avatar answered Sep 21 '22 15:09

Matthias J. Sax


Check my answer here. You should not forget about file rolling. It impacts offset files removal.

like image 25
yuranos Avatar answered Sep 21 '22 15:09

yuranos