If I use a persistent store when materializing a KTable, will the state store be persistent across application restarts? For example, if I use the following:
StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("queryable-store-name");
KTable<Long,String> table = builder.table(
"foo",
Materialized.as(storeSupplier)
.withKeySerde(Serdes.Long())
.withValueSerde(Serdes.String())
Will the state store "queryable-store-name" be accessible with state from previous runs on a restart? Lets say, I send 50 records to topic foo and it gets materialized in the state store. Then the application gets restarted, will I still have those 50 records in the state store? If not, is there a way to achieve that?
Thanks!
Update Streams This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.
Internally, a KTable is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable (ie, to RocksDB) is written into the corresponding Kafka topic.
Kafka Streams are backed by a persistent or in-memory state store, themselves being backed by Kafka changelog topics, providing full fault tolerance. The accompanying Kafka Streams Spring Boot application source code is available here.
KTable is fully stored in RocksDB (== in memory) When KTable receive null-value record it deletes record from RocksDB (== memory freed up)
Yes, state store is by default persisted on disk. When applications is restarted and application-id
wasn't changed, state will be recovered from disk, containing all 50 records. New records will be added from offset when application was killed/stopped/restarted.
Edit: Seems like you're missing aggregation operation on top of the KTable, this is required. See my code example:
final KStream<CustomerKey, ViewPage> viewPagesStream=builder.stream(INPUT_TOPIC);
final KTable<Windowed<ViewPageCountKey>,Long>uniqueViewPageCount=viewPagesStream
.map((key,value)->{
ViewPageCountKey newKey=new ViewPageCountKey(key.getProjectId(),value.getUrl());
return new KeyValue<>(newKey,value);
})
.filter((key,value)->key!=null)
.groupByKey()
.count(TimeWindows.of(WINDOW_SIZE).advanceBy(WINDOW_ADVANCE),STORE_NAME);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With