Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams Persistent Store cleanup

Is there some explicit cleanup needed to prevent each persistent store from growing too much in size? I am currently using it for calculating aggregations in DSL API.

like image 653
Saravanan Setty Avatar asked Dec 13 '18 20:12

Saravanan Setty


1 Answers

We were having a similar issue, we simply scheduled a job for cleaning the store in our processor/transformer. Just implement your isDataOld(nextValue) and you are good to go.

@Override
public void init(ProcessorContext context) {
this.kvStore = (KeyValueStore<Key, Value>) this.context.getStateStore("KV_STORE_NAME");
this.context.schedule(60000, PunctuationType.STREAM_TIME, (timestamp) -> {
    KeyValueIterator<Key, Value> iterator = kvStore.all();
    while (iterator.hasNext()){
    KeyValue<Key,Value> nextValue = iterator.next();
    if isDataOld(nextValue)
       kvStore.delete(nextValue.key);
    }

});
}
like image 51
FrancescoM Avatar answered Nov 15 '22 07:11

FrancescoM