Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between KTable and local store

What the difference between these entities?

As i think, KTable - simple kafka topic with compaction deletion policy. Also, if logging is enabled for KTable, then there is also changelog and then, deletion policy is compaction,delete.

Local store - In-memory key-value cache based on RockDB. But local store also has a changelog.

In both cases, we get the last value for key for a certain period of time (?). Local store is used for aggregation steps, joins and etc. But new topic with compaction strategy also created after it.

For example:

KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?

// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store

source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?

Also i can create a state store using builder builder.addStateStore(...) where i can enable/disable logging(changelog) and caching(???).

I've read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html, but some details are still unclear for me. Especially the case when we can disable StreamCache (but not RockDB cache) and we will get a full copy of CDC system for relational database

like image 795
Nikita Ryanov Avatar asked Sep 24 '18 22:09

Nikita Ryanov


People also ask

What is a KTable?

A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).

What is the difference between KStream and KTable?

KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update in KTable. KStreams are stateless whereas KTable is stateful.

What is the difference between Kafka and Kafka streams?

Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.

Is KTable in memory?

KTable is fully stored in RocksDB (== in memory) When KTable receive null-value record it deletes record from RocksDB (== memory freed up)


1 Answers

A KTable is a logical abstraction of a table that is updated over time. Additionally, you can think of it not as a materialized table, but as a changelog stream that consists of all update records to the table. Compare https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables. Hence, conceptually a KTable is something hybrid if you wish, however, it's easier to think of it as a table that is updated over time.

Internally, a KTable is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable (ie, to RocksDB) is written into the corresponding Kafka topic. The Kafka topic is used for fault-tolerance reasons (note, that RocksDB itself is considered ephemeral and writing to disk via RocksDB does not provide fault-tolerance, but the used changelog topic), and is configured with log compaction enabled to make sure that the latest state of RocksDB can be restored by reading from the topic.

If you have a KTable that is created by a windowed aggregation, the Kafka topic is configured with compact,delete to expired old data (ie, old windows) to avoid that the table (ie, RocksDB) grows unbounded.

Instead of RocksDB, you can also use an in-memory store for a KTable that does not write to disk. This store would also have a changelog topic that tracks all updates to the store for fault-tolerance reasons.

If you add a store manually via builder.addStateStore() you can also add RocksDB or in-memory stores. In this case, you can enable changelogging for fault-tolerance similar to a KTable (note, that when a KTable is created, internally, it uses the exact same API -- ie, a KTable is a higher level abstractions hiding some internal details).

For caching: this is implemented within Kafka Streams and on top of a store (either RocksDB or in-memory) and you can enable/disable is for "plain" stores you add manually, of for KTables. Compare https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html Thus, caching is independent of RocksDB caching.

like image 175
Matthias J. Sax Avatar answered Oct 04 '22 21:10

Matthias J. Sax