I noticed that the aggregate()
stage seems to serialize/deserialize every single element, even though it emits a result periodically.
streamBuilder
.stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
.groupByKey(Serialized.`with`(keySerde, inputValueSerde))
.aggregate(
() => Snapshot.Initial(),
(_, event, prevSnap: Snapshot) => {
// ...
},
Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
)
.toStream()
I was hoping that the key-value store works in memory until there's a write on commit. It looks like not only writes are made for every single update, but there are also reads which deserialize back. Can someone explain how does this work underneath and if I should be concerned about performance?
In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.
Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried by the given store name in materialized .
Serialization is the process of converting objects into bytes. Deserialization is the inverse process — converting a stream of bytes into an object. In a nutshell, it transforms the content into readable and interpretable information.
Serialization is the process of converting an object into a stream of bytes that are used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization, as the name suggests, does the opposite of serialization, in which we convert bytes of arrays into the desired data type.
Your observation that data is always (de)serialized is correct, even if all data is in-memory. All stores in Kafka Streams are based on byte[]
arrays to allow for proper memory management. Deserialized on-heap Java objects have unknown size and make memory management hard and memory usage unpredictable.
Your store would still work in-memory, and writing to disk only happens when necessary and on commit.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With