I have a use case like the following. For each incoming event, I want to look at a certain field to see if it's status changed from A to B and if so, send that to an output topic. The flow is like this: An event with key "xyz" comes in with status A, and some time later another event comes in with key "xyz" with status B. I have this code using the high level DSL.
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
Is there a better way to write this logic using the DSL?
Couple of questions regarding the state store created by the aggregation in the code above.
Thanks in advance!
aggregate. Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.
State. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, which is an important capability when implementing stateful operations.
In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.
By default, a persistent RocksDB store will be used. If you want to use an in-memory store, you would pass in Materialized.as(Stores.inMemoryKeyValueStore(...))
If you have an infinite number of unique keys, you will eventually run out of main-memory or disk and your application will die. Depending on your semantics, you can get a "TTL" by using a session windowed aggregation with a large "gap" parameter instead to expire old keys.
The state will always be restored before processing new data happens. If you use in-memory store, this will happen by consuming the underlying changelog topic. Depending on the size of your state, this can take a while. If you use persistent RocksDB store, the state will be loaded from disk and thus no restore will be required and processing should happen immediately. Only if you loose the state on local disk, a restore from the changelog topic will happen for this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With