I want to create a topic which contains unique keys along with their corresponding most recent values. So when a message with an existing key is inserted to the topic, the old message is removed.
To do so, I have configured the following parameters in server.properties
file:
log.cleaner.enable=true
log.cleanup.policy=compact
# The minimum age of a log file to be eligible for deletion due to age
log.retention.minutes=3
log.retention.bytes=10737418
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000
# The maximum time before a new log segment is rolled out (in milliseconds).
# If not set, the value in log.roll.hours is used
log.roll.ms=600000
So that compaction should take place every 3 minutes. In order to test compaction policy I have created a topic retention_test
kafka-topics --zookeeper localhost:2181 --create --topic retention_test --replication-factor 1 --partitions 1
and using the console consumer, kafka-console-producer --broker-list localhost:9092 --topic retention_test --property parse.key=true --property key.separator=:
I have produced the following messages:
>1:first
>2:second
>3:third
where the console consumer kafka-console-consumer --bootstrap-server localhost:9092 --topic retention_test --from-beginning
consumes them successfully;
first
second
third
Now when I try to insert a message with a key which has already been added, the older message doesn't seem to be ignored and remains in the topic:
In the producer's side:
>1:updatedFirst
Note that in order to test the behavior, I have restarted the consumer multiple times, long after the retention period of 3 minutes has passed. The output is
first
second
third
updatedFirst
The desired output should have been
second
third
updatedFirst
since first
and updatedFirst
have the same key.
According to the docs:
Log compaction gives us a more granular retention mechanism so that we are guaranteed to retain at least the last update for each primary key
Is it possible to keep exactly one message (the most recent one) per key instead of at least one message (including the most recent one)?
What is a Log Compacted Topics. Kafka documentation says: Log compaction is a mechanism to give finer-grained per-record retention, rather than the coarser-grained time-based retention. The idea is to selectively remove records where we have a more recent update with the same primary key.
topic config for log compaction to turn on compaction for a topic, use topic config log. cleanup. policy=compact . to set a delay to start compacting records after they are written, use topic config log.cleaner.min.compaction.lag.ms .
max.compaction.lag.ms :-It is max delay between the time when a message is written and when it becomes eligible for compaction. This overwrites the min. cleanable. dirty.
I'd say it's not generally possible. Kafka stores messages in segments for each partition of each topic. Each segment is a file and they are only ever appended to (or deleted as a whole). Compaction only works by re-writing the existing segment files skipping the messages for which there are later messages with the same key. However, the head segment (the one to which currently new messages are being appended) is not compacted (until a new segment is created which becomes the head segment).
The 3 minutes you configured via log.retention
config is not in play when log.cleanup.policy=compact
, it's only effective when log.cleanup.policy=delete
Why is having exactly one message for a given key important? Perhaps an alternative approach can be suggested if you provide more info about your use case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With