Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregration and state store retention in kafka streams

I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.

final KStream<String, DomainEvent> inputStream....

final KStream<String, DomainEvent> outputStream = inputStream
          .map((k, v) -> new KeyValue<>(v.getId(), v))
                    .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                    .aggregate(DomainStatusMonitor::new,
                            (k, v, aggregate) -> {
                                aggregate.updateStatusMonitor(v);
                                return aggregate;
                            }, Materialized.with(Serdes.String(), jsonSerde))
                    .toStream()
                    .filter((k, v) -> v.isStatusChangedFromAtoB())
                    .map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));

Is there a better way to write this logic using the DSL?

Couple of questions regarding the state store created by the aggregation in the code above.

  1. Is it creating an in-memory state store by default?
  2. What will happen if I have an unbounded number of unique incoming keys? If it is using an in-memory store by default, don't I need to switch to a persistent store? How do we handle situations like that in the DSL?
  3. If the state store is very large (either in-memory or persistent), how does it affect the startup time? How can I make the stream processing to wait so that the store gets fully initialized? Or will Kafka Streams ensure that the state store is fully initialized before any incoming events are processed?

Thanks in advance!

like image 616
sobychacko Avatar asked Jul 22 '18 01:07

sobychacko


People also ask

What is aggregate in Kafka Streams?

aggregate. Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.

What is state store in Kafka Streams?

State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.

How does Kafka aggregation work?

In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.


1 Answers

  1. By default, a persistent RocksDB store will be used. If you want to use an in-memory store, you would pass in Materialized.as(Stores.inMemoryKeyValueStore(...))

  2. If you have an infinite number of unique keys, you will eventually run out of main-memory or disk and your application will die. Depending on your semantics, you can get a "TTL" by using a session windowed aggregation with a large "gap" parameter instead to expire old keys.

  3. The state will always be restored before processing new data happens. If you use in-memory store, this will happen by consuming the underlying changelog topic. Depending on the size of your state, this can take a while. If you use persistent RocksDB store, the state will be loaded from disk and thus no restore will be required and processing should happen immediately. Only if you loose the state on local disk, a restore from the changelog topic will happen for this case.

like image 59
Matthias J. Sax Avatar answered Oct 16 '22 20:10

Matthias J. Sax