Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to enable caching on in-memory Kafka Streams state store

I want to decrease the number of data being sent downstream, and since I only care about the last value of a given key, I'm reading data from a topic this way:

KTable table = build.table("inputTopic", Materialized.as("myStore"));

Why? Because under the hood, the data is being cached, as described here, and forwarded only if the commit.interval.ms or cache.max.bytes.buffering kicks in.

So far so good, but since in this case I'm not taking advantage of RocksDB at all, so I'd like to replace it with the default implementation of an in-memory store. I implicitly enable caching, just in case.

Materialized.as(Stores.inMemoryKeyValueStore("myStore")).withCachingEnabled();

It doesn't work, though - the data is not being cached and every record is being sent downstream.

Is there another way to enable caching? Or perhaps there a better way to do what I'm trying to achieve?

like image 688
Dth Avatar asked Oct 20 '25 03:10

Dth


1 Answers

It seems I was wrong and in-memory state store caching works as expected. I'll briefly show how I've tested it, perhaps someone will find it useful. I made a very basic Kafka Streams application that just reads from a topic abstracted as a KTable.

public class Main {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        Logger logger = LoggerFactory.getLogger(Main.class);

        builder.table("inputTopic", Materialized.as(Stores.inMemoryKeyValueStore("myStore")).withCachingEnabled())
                .toStream()
                .foreach((k, v) -> logger.info("Result: {} - {}", k, v));

        new KafkaStreams(builder.build(), getProperties()).start();
    }

    private static Properties getProperties() {
        Properties properties = new Properties();
        properties.put(APPLICATION_ID_CONFIG, "testApp");
        properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(COMMIT_INTERVAL_MS_CONFIG, 10000);
        properties.put(CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
        properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return properties;
    }
}

Then I ran the console producer from Kafka:

/kafka-console-producer.sh --broker-list localhost:9092 --topic inputTopic --property "parse.key=true" --property "key.separator=:"

And sent few messages: a:a, a:b, a:c. Only the last message of them was visible in the app, so the cache works as expected.

2018-03-06 21:21:57 INFO Main:26 - Result: a - c

I've also changed the stream slightly to check the caching of aggregate method.

builder.stream("inputTopic")
        .groupByKey()
        .aggregate(() -> "", (k, v, a) -> a + v, Materialized.as(Stores.inMemoryKeyValueStore("aggregate")))
        .toStream()
        .foreach((k, v) -> logger.info("Result: {} - {}", k, v));

I've sent few messages in rapid succession with the same key, and I've received just a single result, so the data was not being sent downstream right away - exactly as intended.

like image 121
Dth Avatar answered Oct 21 '25 16:10

Dth



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!