Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Log Compacted Topic Duplication Values against same key not deleted

Log compacted topics are not supposed to keep duplicates against the same key. But in our case, when a new value with the same key is sent, the previous one isn't deleted. What could be the issue?

val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
   (TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde)) 

what i get Actual Result

Offsets      Keys        Messages
5            {"id":5}   {"id":5,"namee":"omer","__deleted":"false"}
6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}

I just want latest one record against same key Expected Result

6            {"id":5}   {"id":5,"namee":"d","__deleted":"false"}
like image 432
Danish Avatar asked Apr 10 '20 13:04

Danish


People also ask

How does log compaction work in Kafka?

Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key. This way the log is guaranteed to have at least the last state for each key.

What is log cleaner in Kafka?

LogCleaner manages one or more CleanerThreads to remove obsolete records from logs with compact retention strategy. LogCleaner is created exclusively for LogManager with enableCleaner flag enabled (of the CleanerConfig of the parent LogManager) (which is the default).

What is a compacted topic Kafka?

Topic compaction is a data retention mechanism that opens up a plethora of possibilities within Kafka.

What is Max compaction lag MS?

max.compaction.lag.ms :-It is max delay between the time when a message is written and when it becomes eligible for compaction. This overwrites the min. cleanable. dirty.


2 Answers

There could be several reasons for this behavior. The compaction cleanup policy does not run after every single incoming message. Instead, there is the broker configuration

log.cleaner.min.compaction.lag.ms: The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

Type: long; Default: 0; Valid Values: ; Update Mode: cluster-wide

This defaults to 0 so that might not be the reason but worth checking.

It is important to note that the compact policy never compacts the current segment. Messages are eligble for compaction only on inactive segments. Make sure to validate

log.segment.bytes: The maximum size of a single log file

Type: int; Default: 1073741824; Valid Values: [14,...]; Update Mode: cluster-wide

The compaction will usually be triggered by the data that is in the current ("dirty") segement of the log. The term "dirty" comes from uncleaned/uncompacted. There is another configuration that helps steer the compaction.

log.cleaner.min.cleanable.ratio: The minimum ratio of dirty log to total log for a log to eligible for cleaning. If the log.cleaner.max.compaction.lag.ms or the log.cleaner.min.compaction.lag.ms configurations are also specified, then the log compactor considers the log eligible for compaction as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the log.cleaner.min.compaction.lag.ms duration, or (ii) if the log has had dirty (uncompacted) records for at most the log.cleaner.max.compaction.lag.ms period.

Type: double; Default: 0.5; Valid Values: ;Update Mode: cluster-wide

Per default, the deletion lag for a message to be compacted is quite high as the following configuration description shows.

log.cleaner.max.compaction.lag.ms: The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.

Type: long; Default: 9223372036854775807; Valid Values: ; Update Mode: cluster-wide

To summarize, there could be several reason why you are observing what you have described. And it is very important to be aware that a compacted topic does not provide any guarantees to have duplicate message for the same key. It can only guarantee that "at least" the latest message for the same key is kept.

There is a nice blog that explains the log compaction in more detail.

like image 102
Michael Heil Avatar answered Sep 23 '22 14:09

Michael Heil


As far as I know it is not possible to apply a log compaction policy in order to keep exactly one message per key. Even if you set cleanup.policy=compact (topic-level) or log.cleanup.policy=compact (global level), there is no guarantee that only the latest message will be kept and older ones will be compacted.

According to the official Kafka documentation:

Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key

like image 26
Giorgos Myrianthous Avatar answered Sep 24 '22 14:09

Giorgos Myrianthous