I have a simple streams application takes one topic as input stream and transforms KeyValues to another like:
StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
Serdes.Long(), CATEGORY_JSON_SERDE);
streamsBuilder.addStateStore(builder)
.stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
.transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);
static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {
static final String STORE_NAME = "test-store";
private KeyValueStore<Long, CategoryDto> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
}
@Override
public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
store.put(key, value);
return KeyValue.pair(key, value);
}
@Override
public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}
Here i had to use transformer because i need to fetch store and update relevant value.
The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?
What is the advantage of using local state stores in this case?
Although it is not shown in your code, I'm assuming you somehow read and use the stored state.
Storing your state using a simple (in memory) HashMap makes your state not persistent at all, this means your state will be lost when either of the following happens (those are nothing out of the ordinary, assume it will happen quite often):
The problem with a non-persistent state is that when any of the above happens, kafka-streams will restart the processing at the last committed offset. As such all records processed before the crash/stop/rebalance will not be reprocessed, this means the content of your HashMap will be empty when the processing restarts. This is certainly not what you want.
On the other hand, if you use one of the provided state stores, kafka-streams will ensure that, once the processing restarts after any of the interruptions listed above, the state will be available as if the processing never stopped, without reprocessing any of the previously processed records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With