Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Kafka Streams aggregation stage serialize and deserialize each single element?

I noticed that the aggregate() stage seems to serialize/deserialize every single element, even though it emits a result periodically.

  streamBuilder
      .stream(inputTopic, Consumed.`with`(keySerde, inputValueSerde))
      .groupByKey(Serialized.`with`(keySerde, inputValueSerde))
      .aggregate(
        () => Snapshot.Initial(),
        (_, event, prevSnap: Snapshot) => {
          // ...
        },
        Materialized.as(stateStoreName).withValueSerde(snapshotSerde)
      )
      .toStream()

I was hoping that the key-value store works in memory until there's a write on commit. It looks like not only writes are made for every single update, but there are also reads which deserialize back. Can someone explain how does this work underneath and if I should be concerned about performance?

like image 792
kciesielski Avatar asked May 28 '19 22:05

kciesielski


People also ask

How does Kafka aggregation work?

In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.

What is aggregate in Kafka streams?

Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried by the given store name in materialized .

What is serialization and Deserialization in Kafka?

Serialization is the process of converting objects into bytes. Deserialization is the inverse process — converting a stream of bytes into an object. In a nutshell, it transforms the content into readable and interpretable information.

How does Kafka serialize data?

Serialization is the process of converting an object into a stream of bytes that are used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization, as the name suggests, does the opposite of serialization, in which we convert bytes of arrays into the desired data type.


1 Answers

Your observation that data is always (de)serialized is correct, even if all data is in-memory. All stores in Kafka Streams are based on byte[] arrays to allow for proper memory management. Deserialized on-heap Java objects have unknown size and make memory management hard and memory usage unpredictable.

Your store would still work in-memory, and writing to disk only happens when necessary and on commit.

like image 110
Matthias J. Sax Avatar answered Sep 17 '22 23:09

Matthias J. Sax