Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka-streams: setting internal topics cleanup policy to delete doesn't work

I use kafka streams reduce function and it creates some state store change log kafka internal topic ( like app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ).

I wanted to set retention bytes and change cleanup policy to delete to prevent the storage being full. So I set following configs in kafka streams code:

Properties props = new Properties();
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.RETENTION_BYTES_CONFIG, Constants.INTERNAL_TOPICS_RETENTION_BYTES);
props.put(StreamsConfig.TOPIC_PREFIX + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
KafkaStreams streams = new KafkaStreams(builder.build(), props);

However, when a new topic is generated, only the retention config is applied to the newly generated internal topic and cleanup policy remains compact.

Is there any missing step to do so ? ( or Isn't it possible to set internal topics cleanup policy to delete ?)

I use kafka version 1.0.0 and kafka-streams version 1.0.0

like image 996
Amir Masud Zare Bidaki Avatar asked Sep 02 '18 06:09

Amir Masud Zare Bidaki


People also ask

What is Kafka cleanup policy?

Kafka Log Cleanup PoliciesKafka stores messages for a set amount of time and purge messages older than the retention period.

What is the default value of Kafka topic property cleanup policy?

cleanup. The default policy ("delete") will discard old segments when their retention time or size limit has been reached.

How do I clear Kafka stream?

You can use either of these methods: The API method KafkaStreams#cleanUp() in your application code. Manually delete the corresponding local state directory (default location: /var/lib/kafka-streams/<application.id> ). For more information, see state.

What are the strategies used by Kafka to clean up its old log segments?

Delete and Compact Both: We can specify both delete and compact values for the cleanup. policy configuration at the same time. In this case, the log is compacted, but the cleanup process also follows the retention time or size limit settings.


1 Answers

Thanks Guozhang for his answer in kafka mailing list:

The issue you described seems like an old bug that is resolved since 1.1.0 (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).

... You do not need to upgrade broker in order to use newer Streams library versions.

Upgrading kafka-streams version to 1.1.0 fixed the issue.

like image 115
Amir Masud Zare Bidaki Avatar answered Sep 28 '22 12:09

Amir Masud Zare Bidaki