Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

KTable state store persistence

If I use a persistent store when materializing a KTable, will the state store be persistent across application restarts? For example, if I use the following:

StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier =      Stores.persistentKeyValueStore("queryable-store-name");
 KTable<Long,String> table = builder.table(
   "foo",
   Materialized.as(storeSupplier)
               .withKeySerde(Serdes.Long())
               .withValueSerde(Serdes.String())

Will the state store "queryable-store-name" be accessible with state from previous runs on a restart? Lets say, I send 50 records to topic foo and it gets materialized in the state store. Then the application gets restarted, will I still have those 50 records in the state store? If not, is there a way to achieve that?

Thanks!

like image 342
sobychacko Avatar asked Jul 20 '18 03:07

sobychacko


People also ask

What is the difference between KTable and KStream?

Update Streams This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.

How are Ktables stored?

Internally, a KTable is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable (ie, to RocksDB) is written into the corresponding Kafka topic.

Are Kafka streams persistent?

Kafka Streams are backed by a persistent or in-memory state store, themselves being backed by Kafka changelog topics, providing full fault tolerance. The accompanying Kafka Streams Spring Boot application source code is available here.

Is KTable in memory?

KTable is fully stored in RocksDB (== in memory) When KTable receive null-value record it deletes record from RocksDB (== memory freed up)


1 Answers

Yes, state store is by default persisted on disk. When applications is restarted and application-id wasn't changed, state will be recovered from disk, containing all 50 records. New records will be added from offset when application was killed/stopped/restarted.

Edit: Seems like you're missing aggregation operation on top of the KTable, this is required. See my code example:

final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);

final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
        .map((key,value)->{
            ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
            return new KeyValue<>(newKey,value);
        })
        .filter((key,value)->key!=null)
        .groupByKey()
        .count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);
like image 66
Matus Cimerman Avatar answered Jan 01 '23 20:01

Matus Cimerman