I'm playing with Kafka in my local machine, and I have added the following Topic configuration:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 config retention.ms=60000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 —config file.delete.delay.ms=40000
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --config segment.bytes=400000
My understanding is a segment will be deleted when the segment reaches out the segment size above defined (segment.bytes=400000) PLUS every single message within the segment is older than the retention time above defined (retention.ms=60000).
What I noticed is a segment of just 35 bytes, which conteined just one message, was deleted after the minute (maybe a little more)
Where I get that information? from a post that a Linkedin Engineer made about how the deletion process works:
Retention is going to be based on a combination of both the retention and segment size settings (as a side note, it's recommended to use log.retention.ms and log.segment.ms, not the hours config. That's there for legacy reasons, but the ms configs are more consistent). As messages are received by Kafka, they are written to the current open log segment for each partition. That segment is rotated when either the log.segment.bytes or the log.segment.ms limit is reached. Once that happens, the log segment is closed and a new one is opened. Only after a log segment is closed can it be deleted via the retention settings. Once the log segment is closed AND either all the messages in the segment are older than log.retention.ms OR the total partition size is greater than log.retention.bytes, then the log segment is purged.
Link: How the retention works
There is one broker configuration - log.retention.check.interval.ms that affects this test. It is by default 5 minutes. So the broker log-segments are checked every 5 minutes to see if they can be deleted according to the retention policies.
topic1 configuration had retention policy set (retention.ms=60000), so if there was at least one existing message in an active segment of topic1, that segment would get closed and deleted if it was idle for long enough. Since log.retention.check.interval.ms is broker configuration, it's not affected by changes on the topic. Also retention.ms has to pass after the last message is produced to the segment. So after the last message is produced to that segment, segment will be deleted in not less than retention.ms milliseconds and not more than retention.ms+log.retention.check.interval.ms.
So the "segment of just 35 bytes, which contained just one message, was deleted after the minute (maybe a little more)" happened because retention check by chance happened almost immediately after the message was produced to that segment. Broker then had just to wait 60 seconds to be sure no new message will be produced to that segment (in which case deletion would't happen) and since there was none, it deleted the segment
Other topic1 configurations didn't play much role here.
file.delete.delay.ms=40000 just defined that after the broker has already decided to delete that segment it should wait 40 seconds to do that
segment.bytes=400000 didn't have any effect in this test. It defines that after 400000bytes are stored to the segment, it should be closed and the new one should be rolled out.
You miss interpret some of the statements you cite:
That segment is rotated when either the log.segment.bytes or the log.segment.ms limit is reached.
This clearly says rotation can be triggered by size or time. It's or, not and.
Once that happens, the log segment is closed. [...] Once the log segment is closed AND either all the messages in the segment are older than log.retention.ms OR the total partition size is greater than log.retention.bytes, then the log segment is purged.
Thus, after a segment got closed by rotating triggered by time, it can be deleted regardless of its size.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With