Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I delete/clean Kafka queued messages without deleting Topic

Tags:

apache-kafka

Is there any way to delete queue messages without deleting Kafka topics?
I want to delete queue messages when activating the consumer.

I know there are several ways like:

  1. Resetting retention time

    $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --config retention.ms=1000

  2. Deleting kafka files

    $ rm -rf /data/kafka-logs/<topic/Partition_name>

like image 453
Tachikoma Avatar asked Sep 14 '17 02:09

Tachikoma


People also ask

How do I clear message queue in Kafka?

The easiest way to purge or delete messages in a Kafka topic is by setting the retention.ms to a low value. retention.ms configuration controls how long messages should be kept in a topic. Once the age of the message in a topic hits the retention time the message will be removed from the topic.

How do I clear Kafka cache?

stop zookeeper & Kafka server, 2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name 3. go to 'zookeeper-data' folder , delete data inside that. 4. start zookeeper & kafka server again.

How long Kafka will keep the messages in the queue?

In Kafka queues, messages are retained for 1 to 72 hours, depending on what you choose when creating a queue. In Kafka premium instances, messages are retained for 1 to 168 hours, depending on what you choose when creating a topic.

How do I delete messages from compacted topic?

Deleting a message from a compacted topic is as simple as writing a new message to the topic with the key you want to delete and a null value. When compaction runs the message will be deleted forever. producer. send(new ProducerRecord(CUSTOMERS_TOPIC, “Customer123”, null));


1 Answers

In 0.11 or higher you can run the bin/kafka-delete-records.sh command to mark messages for deletion.

https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

For example, publish 100 messages

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest 

then delete 90 of those 100 messages with the new kafka-delete-records.sh command line tool

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json 

where offsetfile.json contains

 {"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 } 

and then consume the messages from the beginning to verify that 90 of the 100 messages are indeed marked as deleted.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning 91 92 93 94 95 96 97 98 99 100 
like image 140
Hans Jespersen Avatar answered Oct 04 '22 00:10

Hans Jespersen