Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams local state stores

I have a simple streams application takes one topic as input stream and transforms KeyValues to another like:

StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
        Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
                                    Serdes.Long(), CATEGORY_JSON_SERDE);
    streamsBuilder.addStateStore(builder)
                         .stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
                         .transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);

static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {

    static final String STORE_NAME = "test-store";

    private KeyValueStore<Long, CategoryDto> store;

    @Override
    public void init(ProcessorContext context) {
      store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
    }

    @Override
    public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
      store.put(key, value);
      return KeyValue.pair(key, value);
    }

    @Override
    public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
      return null;
    }

    @Override
    public void close() {

    }
  }

Here i had to use transformer because i need to fetch store and update relevant value.

The question is what is the difference between using local state stores, and just putting values to a simple HashMap inside a ForeachAction?

What is the advantage of using local state stores in this case?

like image 599
px5x2 Avatar asked Dec 13 '25 10:12

px5x2


1 Answers

Although it is not shown in your code, I'm assuming you somehow read and use the stored state.

Storing your state using a simple (in memory) HashMap makes your state not persistent at all, this means your state will be lost when either of the following happens (those are nothing out of the ordinary, assume it will happen quite often):

  • your stream processor/applications stops,
  • crashes, or
  • is partially migrated elsewhere (other JVM) due to rebalancing.

The problem with a non-persistent state is that when any of the above happens, kafka-streams will restart the processing at the last committed offset. As such all records processed before the crash/stop/rebalance will not be reprocessed, this means the content of your HashMap will be empty when the processing restarts. This is certainly not what you want.

On the other hand, if you use one of the provided state stores, kafka-streams will ensure that, once the processing restarts after any of the interruptions listed above, the state will be available as if the processing never stopped, without reprocessing any of the previously processed records.

like image 138
Frederic A. Avatar answered Dec 16 '25 00:12

Frederic A.



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!