Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka not deleting key with tombstone

Tags:

apache-kafka

I create a kafka topic with below properties

min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=100,cleanup.policy=compact

Let's say I insert k-v pairs in order 1111:1, 1111:2, 1111:null, 2222:1 What happens now is except last message, the log compaction runs on rest of the messages and clears first two but retains 1111:null

Acc to the documentation,

Kafka log compaction also allows for deletes. A message with a key and a null payload acts like a tombstone, a delete marker for that key. Tombstones get cleared after a period.

So, I am hoping when delete.retention.ms is achieved, the null marker should delete the message with key 1111

I have two questions - Why is the tombstone marker not working? Why is the last message ignored from compaction?

This is what server.properties file has -

log.retention.ms=100
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=100
log.cleaner.delete.retention.ms=100
log.cleaner.enable=true
log.cleaner.min.cleanable.ratio=0.01
like image 224
Sam Avatar asked Oct 08 '17 15:10

Sam


People also ask

What is tombstone message in Kafka?

One of the neat things that Kafka does with its messages is the concept of tombstone messages. These are messages with a null value. They're usually used in conjunction with a key to indicate the logical deletion of a record.

How do I delete a record on Kafka?

using message expiry settings to have Kafka automatically flush the messages after a certain amount of time. utilize a special tool called kafka-delete-records.sh to allow you to remove messages by specifying message and offset.

How do I delete specific messages from Kafka?

It is not possible to remove a single message from a Kafka topic, even though you know its partition and offset. Keep in mind, that Kafka is not a key/value store but a topic is rather an append-only(!) log that represents a stream of data.

Does Kafka delete message after consume?

Purging of messages in Kafka is done automatically by either specifying a retention time for a topic or by defining a disk quota for it so for your case of one 5GB file, this file will be deleted after the retention period you define has passed, regardless of if it has been consumed or not.


4 Answers

Tombstone records are preserved longer by design. The reason is, that brokers don't track consumers. Assume, that a consumer goes offline for some time after reading the first record. While the consumer is down, log compaction kicks. If log compaction would delete the tombstone record, the consumer would never learn about the fact, that the record was deleted. If the consumer implements a cache, it could happen that the record gets never deleted. Thus, tombstone are preserved longer to allow offline consumer to receive all tombstones for local cleanup.

Tombstone will be deleted only after delete.retention.ms (default value is 1 day). Note: this is a topic level configuration and there is no broker level configuration for it. Thus, you need to set the config per topic if you want to change it.

like image 163
Matthias J. Sax Avatar answered Sep 30 '22 05:09

Matthias J. Sax


Compacted topic has two portions:

1) Cleaned portion: Portion of kafka log cleaned by kafka cleaner at least once.

2) Dirty portion: Portion of kafka log not cleaned by kafka cleaner even once until now. Kafka maintains dirty offset. All messages with offset >= dirty offset belong to dirty portion.

Note: Kafka cleaner cleans all segments (irrespective of whether segment is in cleaned/dirty portion) and re-copies them every time dirty ratio reaches min.cleanable.dirty.ratio.

Tombstones are deleted segment wise. Tombstones in a segment are deleted if segment satisfies below conditions:

  1. Segment should be in cleaned portion of log.

  2. Last modified time of segment should be <= (Last modified time of segment containing a message with offset=(dirty offset - 1)) - delete.retention.ms.

It is difficult to elaborate second point but in simple terms, Second point implies => Segment size should be equal to log.segment.bytes/segment.bytes (1GB by default). For segment size (in cleaner portion) to be equal to 1GB, you need to produce large number of messages with distinctive keys. But you produced only 4 messages with 3 messages having same key. That is why tombstones are not deleted in segment containing 1111:null message (Segment doesn't satisfy second point I mentioned above).

You have two options to delete tombstones with 4 messages:

  1. make delete.retention.ms=0 or
  2. make log.segment.bytes/segment.bytes=50.

Source Code (Extra Reading): https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogCleaner.scala

try {
      // clean segments into the new destination segment
      for (old <- segments) {
        val retainDeletes = old.lastModified > deleteHorizonMs
        info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
            .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
      }
like image 21
Hemanth Avatar answered Sep 30 '22 05:09

Hemanth


The algorithm for removing the tombstone in a compacted is supposed to be the following.

  1. Tombstone is never removed when it's still in the dirty portion of the log.
  2. After the tombstone is in the cleaned portion of the log, we further delay the removal of the tombstone by delete.retention.ms since the time the tombstone is in the cleaned portion.

It's possible that the tombstones are still in the dirty portion of the log and hence not cleared. Triggering a few more messages of different keys should push the tombstones into the cleaned portion of the log and delete them.

like image 30
yatishby Avatar answered Sep 30 '22 04:09

yatishby


From the compaction perspective, the kafka log contains three parts:

  1. active segment, to which the new record will be appended.
  2. log head, which has not yet been compacted.
  3. log tail, which has been compacted or being compacted.

enter image description here

Kafka documentation:

Note that this compaction deadline is not a hard guarantee since it is still subjected to the availability of log cleaner threads and the actual compaction time. You will want to monitor the uncleanable-partitions-count, max-clean-time-secs and max-compaction-delay-secs metrics.

As @yatishby pointed out, if your tombstone is in the active segment or the log head, the compaction won't be triggered.

Therefore, I tried to flush dozens of messages to the topic and move the tombstone from the log head to the log tail.

Then, it works, the tombstones have been successfully removed.

Here are the steps to reproduce it.

Step1: send tombstone to Kafka

I have created a test topic with the same configuration as you. then send tombstones to Kafka. Even though delete.retention.ms=100 has been specified, as you can see, the tombstones stay there. it seems not working.

enter image description here

Step2: flush identical messages to Kafka.

# do it 20 times.
echo "g:1" | kcat -b localhost:9092 -t test -Z -K:
echo "j:1" | kcat -b localhost:9092 -t test -Z -K:

Kcat is a very convenient tool to produce messages to kafka.

Step3: The tombstones have disappeared.

enter image description here

Cheatsheet

  1. How to produce a tombstone to Kafka using the kcat?

    echo "yourkey:" | kcat -b localhost:9092 -t test -Z -K:
    
  2. how to consume messages from a topic?

    kafka-console-consumer --bootstrap-server localhost:9092 --topic test  \
    --from-beginning \                             
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.timestamp=true \
    --property print.key=true \
    --property print.value=true
    
like image 41
Ryan Lyu Avatar answered Sep 30 '22 06:09

Ryan Lyu