What the difference between these entities?
As i think, KTable - simple kafka topic with compaction
deletion policy. Also, if logging is enabled for KTable, then there is also changelog and then, deletion policy is compaction,delete
.
Local store - In-memory key-value cache based on RockDB. But local store also has a changelog.
In both cases, we get the last value for key for a certain period of time (?). Local store is used for aggregation steps, joins and etc. But new topic with compaction strategy also created after it.
For example:
KStream<K, V> source = builder.stream(topic1);
KTable<K, V> table = builder.table(topic2); // what will happen here if i read data from topic with deletion policy delete and compaction? Will additional topic be created for store data or just a local store (cache) be used for it?
// or
KTable<K, V> table2 = builder.table(..., Materialized.as("key-value-store-name")) // what will happen here? As i think, i just specified a concrete name for local store and now i can query it as a regular key-value store
source.groupByKey().aggregate(initialValue, aggregationLogic, Materialized.as(...)) // Will new aggregation topic be created here with compaction deletion policy? Or only local store will be used?
Also i can create a state store using builder builder.addStateStore(...)
where i can enable/disable logging(changelog) and caching(???).
I've read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html, but some details are still unclear for me. Especially the case when we can disable StreamCache (but not RockDB cache) and we will get a full copy of CDC system for relational database
A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT).
KStream handles the stream of records. On the other hand, KTable manages the changelog stream with the latest state of a given key. Each data record represents an update in KTable. KStreams are stateless whereas KTable is stateful.
Introduction. Apache Kafka is the most popular open-source distributed and fault-tolerant stream processing system. Kafka Consumer provides the basic functionalities to handle messages. Kafka Streams also provides real-time stream processing on top of the Kafka Consumer client.
KTable is fully stored in RocksDB (== in memory) When KTable receive null-value record it deletes record from RocksDB (== memory freed up)
A KTable
is a logical abstraction of a table that is updated over time. Additionally, you can think of it not as a materialized table, but as a changelog stream that consists of all update records to the table. Compare https://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables. Hence, conceptually a KTable
is something hybrid if you wish, however, it's easier to think of it as a table that is updated over time.
Internally, a KTable
is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable
(ie, to RocksDB) is written into the corresponding Kafka topic. The Kafka topic is used for fault-tolerance reasons (note, that RocksDB itself is considered ephemeral and writing to disk via RocksDB does not provide fault-tolerance, but the used changelog topic), and is configured with log compaction enabled to make sure that the latest state of RocksDB can be restored by reading from the topic.
If you have a KTable
that is created by a windowed aggregation, the Kafka topic is configured with compact,delete
to expired old data (ie, old windows) to avoid that the table (ie, RocksDB) grows unbounded.
Instead of RocksDB, you can also use an in-memory store for a KTable
that does not write to disk. This store would also have a changelog topic that tracks all updates to the store for fault-tolerance reasons.
If you add a store manually via builder.addStateStore()
you can also add RocksDB or in-memory stores. In this case, you can enable changelogging for fault-tolerance similar to a KTable
(note, that when a KTable is created, internally, it uses the exact same API -- ie, a KTable
is a higher level abstractions hiding some internal details).
For caching: this is implemented within Kafka Streams and on top of a store (either RocksDB or in-memory) and you can enable/disable is for "plain" stores you add manually, of for KTables. Compare https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html Thus, caching is independent of RocksDB caching.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With