Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I set unlimited retention for an compacted topic in Kafka?

Tags:

apache-kafka

I am struggling to get a compacted topic working as expected.

I have a compacted topic, and messages are getting properly compacted but when old messages get older than the default retention period, they get deleted.

I want a compacted topic that has at least one value for a key indefinitely.

How to accomplish that? Do I have to override the retention period for that particular compacted topic? Or should a compacted topic keep one value of a key indefinitely by default?

My config

log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 1000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.001
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [compact, delete]

log.retention.bytes = -1
log.retention.check.interval.ms = 5000
log.retention.hours = 0
log.retention.minutes = 20
log.retention.ms = null
like image 512
Greg Bala Avatar asked Nov 08 '18 21:11

Greg Bala


2 Answers

I found a solution that I want to share. Unfortunately, Kafka documentation is not very clear on this, so perhaps this will help someone:

Do not set this :

log.cleanup.policy = [compact, delete]

This setting will mean that all topics, are both compacted and deleted. So your topic will get compacted as per compaction rules, but when segments (messages) get older than the set retention time (in my case it was 20 min), they get deleted as well.

Set default clean up policy to:

log.cleanup.policy = compact
or 
log.cleanup.policy = delete

(log.cleanup.policy = delete is the default config)

"log.cleanup.policy = compact" will mean that topics, by default, will be set to be compacted.

When you set this default policy, you do not need to make any changes. There is no need to set log.retention to -1 or any other value. Your topics will be compacted and old messages never deleted (as per compaction rules)

"log.cleanup.policy = delete" means that topics will by default get pruned past retention time.

If you choose this default policy, then you will need to override the cleanup.policy per topic; that is, set the cleanup.policy=compact explicitly on this topic. This will turn this specific topic to use compaction, rather than delete. You do not need to adjust log.retention.

PS, Intuitively, I would think that the default "log.cleanup.policy = [compact, delete]" gets overwritten when you specify "log.cleanup.policy = compact" on per topic basis, but this is not so. With "log.cleanup.policy = [compact, delete]" you are effectively overriding how compact topics work; you change compact to be compact+delete.

PS2, if you have trouble testing and getting your topic to compact, note that only the inactive file segment can be compacted; active segment will never be compacted. So for testing, set log.segment.bytes to something small, say 10000

like image 69
Greg Bala Avatar answered Sep 18 '22 11:09

Greg Bala


Choose only "compact" as the cleanup policy, and set an infinite retention.

log.cleanup.policy = [compact]
log.retention.bytes = -1
log.retention.ms = -1
like image 21
Christophe Quintard Avatar answered Sep 18 '22 11:09

Christophe Quintard