Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Log compaction to keep exactly one message per key

Tags:

apache-kafka

I want to create a topic which contains unique keys along with their corresponding most recent values. So when a message with an existing key is inserted to the topic, the old message is removed.

To do so, I have configured the following parameters in server.properties file:

log.cleaner.enable=true
log.cleanup.policy=compact

# The minimum age of a log file to be eligible for deletion due to age
log.retention.minutes=3

log.retention.bytes=10737418

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# The maximum time before a new log segment is rolled out (in milliseconds).
# If not set, the value in log.roll.hours is used
log.roll.ms=600000

So that compaction should take place every 3 minutes. In order to test compaction policy I have created a topic retention_test

kafka-topics --zookeeper localhost:2181 --create --topic retention_test --replication-factor 1 --partitions 1

and using the console consumer, kafka-console-producer --broker-list localhost:9092 --topic retention_test --property parse.key=true --property key.separator=: I have produced the following messages:

>1:first
>2:second
>3:third

where the console consumer kafka-console-consumer --bootstrap-server localhost:9092 --topic retention_test --from-beginning consumes them successfully;

first
second
third

Now when I try to insert a message with a key which has already been added, the older message doesn't seem to be ignored and remains in the topic:

In the producer's side:

>1:updatedFirst

Note that in order to test the behavior, I have restarted the consumer multiple times, long after the retention period of 3 minutes has passed. The output is

first
second
third
updatedFirst

The desired output should have been

second
third
updatedFirst

since first and updatedFirst have the same key.

According to the docs:

Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key

Is it possible to keep exactly one message (the most recent one) per key instead of at least one message (including the most recent one)?

like image 1000
Giorgos Myrianthous Avatar asked Apr 23 '18 12:04

Giorgos Myrianthous


People also ask

What is a log compacted topic?

What is a Log Compacted Topics. Kafka documentation says: Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key.

How do I enable log compaction in Kafka?

topic config for log compaction to turn on compaction for a topic, use topic config log. cleanup. policy=compact . to set a delay to start compacting records after they are written, use topic config log.cleaner.min.compaction.lag.ms .

What is Max compaction lag MS?

max.compaction.lag.ms :-It is max delay between the time when a message is written and when it becomes eligible for compaction. This overwrites the min. cleanable. dirty.


1 Answers

I'd say it's not generally possible. Kafka stores messages in segments for each partition of each topic. Each segment is a file and they are only ever appended to (or deleted as a whole). Compaction only works by re-writing the existing segment files skipping the messages for which there are later messages with the same key. However, the head segment (the one to which currently new messages are being appended) is not compacted (until a new segment is created which becomes the head segment).

The 3 minutes you configured via log.retention config is not in play when log.cleanup.policy=compact, it's only effective when log.cleanup.policy=delete

Why is having exactly one message for a given key important? Perhaps an alternative approach can be suggested if you provide more info about your use case.

like image 110
Michal Borowiecki Avatar answered Sep 19 '22 16:09

Michal Borowiecki