I create a kafka topic with below properties
min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=100,cleanup.policy=compact
Let's say I insert k-v pairs in order 1111:1, 1111:2, 1111:null, 2222:1 What happens now is except last message, the log compaction runs on rest of the messages and clears first two but retains 1111:null
Acc to the documentation,
Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period.
So, I am hoping when delete.retention.ms is achieved, the null marker should delete the message with key 1111
I have two questions - Why is the tombstone marker not working? Why is the last message ignored from compaction?
This is what server.properties file has -
log.retention.ms=100
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01
One of the neat things that Kafka does with its messages is the concept of tombstone messages. These are messages with a null value. They're usually used in conjunction with a key to indicate the logical deletion of a record.
using message expiry settings to have Kafka automatically flush the messages after a certain amount of time. utilize a special tool called kafka-delete-records.sh to allow you to remove messages by specifying message and offset.
It is not possible to remove a single message from a Kafka topic, even though you know its partition and offset. Keep in mind, that Kafka is not a key/value store but a topic is rather an append-only(!) log that represents a stream of data.
Purging of messages in Kafka is done automatically by either specifying a retention time for a topic or by defining a disk quota for it so for your case of one 5GB file, this file will be deleted after the retention period you define has passed, regardless of if it has been consumed or not.
Tombstone records are preserved longer by design. The reason is, that brokers don't track consumers. Assume, that a consumer goes offline for some time after reading the first record. While the consumer is down, log compaction kicks. If log compaction would delete the tombstone record, the consumer would never learn about the fact, that the record was deleted. If the consumer implements a cache, it could happen that the record gets never deleted. Thus, tombstone are preserved longer to allow offline consumer to receive all tombstones for local cleanup.
Tombstone will be deleted only after delete.retention.ms
(default value is 1 day). Note: this is a topic level configuration and there is no broker level configuration for it. Thus, you need to set the config per topic if you want to change it.
Compacted topic has two portions:
1) Cleaned portion: Portion of kafka log cleaned by kafka cleaner at least once.
2) Dirty portion: Portion of kafka log not cleaned by kafka cleaner even once until now. Kafka maintains dirty offset. All messages with offset >= dirty offset belong to dirty portion.
Note: Kafka cleaner cleans all segments (irrespective of whether segment is in cleaned/dirty portion) and re-copies them every time dirty ratio reaches min.cleanable.dirty.ratio.
Tombstones are deleted segment wise. Tombstones in a segment are deleted if segment satisfies below conditions:
Segment should be in cleaned portion of log.
Last modified time of segment should be <= (Last modified time of segment containing a message with offset=(dirty offset - 1)) - delete.retention.ms.
It is difficult to elaborate second point but in simple terms, Second point implies => Segment size should be equal to log.segment.bytes/segment.bytes (1GB by default). For segment size (in cleaner portion) to be equal to 1GB, you need to produce large number of messages with distinctive keys. But you produced only 4 messages with 3 messages having same key. That is why tombstones are not deleted in segment containing 1111:null message (Segment doesn't satisfy second point I mentioned above).
You have two options to delete tombstones with 4 messages:
Source Code (Extra Reading): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala
try {
// clean segments into the new destination segment
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
}
The algorithm for removing the tombstone in a compacted is supposed to be the following.
It's possible that the tombstones are still in the dirty portion of the log and hence not cleared. Triggering a few more messages of different keys should push the tombstones into the cleaned portion of the log and delete them.
From the compaction perspective, the kafka log contains three parts:
Kafka documentation:
Note that this compaction deadline is
not a hard guarantee
since it is still subjected to the availability of log cleaner threads and the actual compaction time. You will want to monitor the uncleanable-partitions-count, max-clean-time-secs and max-compaction-delay-secs metrics.
As @yatishby pointed out, if your tombstone is in the active segment or the log head, the compaction won't be triggered.
Therefore, I tried to flush dozens of messages to the topic and move the tombstone from the log head to the log tail.
Then, it works, the tombstones have been successfully removed.
Here are the steps to reproduce it.
I have created a test topic with the same configuration as you. then send tombstones to Kafka. Even though delete.retention.ms=100
has been specified, as you can see, the tombstones stay there. it seems not working.
# do it 20 times.
echo "g:1" | kcat -b localhost:9092 -t test -Z -K:
echo "j:1" | kcat -b localhost:9092 -t test -Z -K:
Kcat is a very convenient tool to produce messages to kafka.
How to produce a tombstone to Kafka using the kcat?
echo "yourkey:" | kcat -b localhost:9092 -t test -Z -K:
how to consume messages from a topic?
kafka-console-consumer --bootstrap-server localhost:9092 --topic test \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.timestamp=true \
--property print.key=true \
--property print.value=true
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With