My stream has a column called 'category' and I have additional static metadata for each 'category' in a different store, it gets updated once every couple of days. What is the right way to do this lookup? There are two options with Kafka streams
Load static data outside of Kafka Streams and just use KStreams#map()
to add metadata. This is possible as Kafka Streams is just a library.
Load the metadata to a Kafka topic, load it to a KTable
and do KStreams#leftJoin()
, this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable
loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
Which of the above is the right way to lookup metadata?
Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable
.
Is there another way using stores?
Build a Stream Builder and use it to create a KStream that reads from the ‘ names ‘ topic. Enter the following code: val kstreamBuilder = new KStreamBuilder val rawStream: KStream [String, String] = kstreamBuilder.stream ("names") Next, using the mapValues method, map each value in the raw stream.
It returns a KGroupedStream containing the grouped records of the original KStream. This method joins the records of one stream with another KStream’s records using windowed inner equi join with default serializers and deserializers. With the join attribute thisKStream.key == otherKStream.key, the join is computed on the key of the records.
Message enrichment is a standard stream processing task and I want to show different options Kafka Streams provides to implement it properly. Scenario 1: enriching using static (or mostly static) data
A KStream is an abstraction of a record stream. Here, each data record represents a self-contained unit of data in the unbounded data set. In other words, data records in a record stream are always interpreted as an “ INSERT ”. The existing records are not replaced by the new ones having the same key.
- Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.
This works. But usually people opt for the next option you listed, because the side data to enrich the input stream with is typically not fully static; rather, it is changing but somewhat infrequently:
- Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
This is the usual approach, and I'd recommend to stick to it unless you have a specific reason not to.
However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.
So I guess you also prefer the second option, but you are concerned about whether or not this is efficient.
Short answer is: Yes, the KTable will be loaded with all the (latest) values per key. The table will contain the entire lookup data, but keep in mind that the KTable is partitioned behind the scenes: if, for example, your input topic (for the table) has 3
partitions, then you can run up to 3
instances of your application, each of which getting 1
partition of the table (assuming data is spread evenly across partitions, then each partition/shared of the table would hold about 1/3 of the table's data). So in practice more likely than not it "just works". I share more details below.
Global KTables: Alternatively, you can use global KTables instead of the (paritioned) normal table variant. With global tables every instance of your application has a full copy of the table data. This makes global tables very useful for join scenarios, including for enriching a KStream as per your question.
Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.
You don't need to worry about that. Simply put, if there is no local "copy" of the table available, then the Streams API would automatically ensure that the table's data is read fully from scratch. If there is a local copy available, then your application will re-use that copy (and update its local copy whenever new data is available in the table's input topic).
Longer answer with examples
Imagine the following input data (think: changelog stream) for your KTable
, note how this input consists of 6
messages:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
And here's the various states of the "logical" KTable
that would result from this input is, where each newly received input message (such as (alice, 1)
) would result in a new state of the table:
Key Value
--------------
alice | 1 // (alice, 1) received
|
V
Key Value
--------------
alice | 1
bob | 40 // (bob, 40) received
|
V
Key Value
--------------
alice | 2 // (alice, 2) received
bob | 40
|
V
Key Value
--------------
alice | 2
bob | 40
charlie | 600 // (charlie, 600) received
|
V
Key Value
--------------
alice | 5 // (alice, 5) received
bob | 40
charlie | 600
|
V
Key Value
--------------
alice | 5
bob | 22 // (bob, 22) received
charlie | 600
What you can see here is that, even though the input data may have many, many messages (or "changes" as you said; here, we have 6
), the number of entries/rows in the resulting KTable
(which is undergoing continuous mutations based on the newly received input) is the number of unique keys in the input (here: starting out with 1
, ramping up to 3
), which typically is significantly less than the number of messages. So, if the number of messages in the input is N
and the number of unique keys for these messages is M
, then typically M << N
(M
is significantly smaller than N
; plus, for the record, we have the invariant M <= N
).
This is the first reason why "this requires us to keep the KTable loaded with all the values" is typically not an issue, because only the latest value is retained per key.
The second reason that helps is that, as Matthias J. Sax has pointed out, Kafka Streams uses RocksDB as the default storage engine for such tables (more precisely: the state stores that back a table). RocksDB allows you to maintain tables that are larger than the available main memory / Java heap space of your application because it can spill to local disk.
Lastly, the third reason is that a KTable
is partitioned. So, if your input topic for the table is (say) configured with 3
partitions, then what's happening behind the scenes is that the KTable
itself is partitioned (think: sharded) in the same way. In the example above, here's what you could end up with, though the exact "splits" depend on the how the original input data is spread across the partitions of the table's input topic:
Logical KTable (last state of what I showed above):
Key Value
--------------
alice | 5
bob | 22
charlie | 600
Actual KTable, partitioned (assuming 3
partitions for the table's input topic, plus keys=usernames being spread evenly across partitions):
Key Value
--------------
alice | 5 // Assuming that all data for `alice` is in partition 1
Key Value
--------------
bob | 22 // ...for `bob` is in partition 2
Key Value
--------------
charlie | 600 // ...for `charlie` is in partition 3
In practice, this partitioning of the input data -- among other things -- allows you to "size" the actual manifestations of a KTable.
Another example:
1
partition, then the KTable itself also has only 1
partition, with a size of 1 TB. Here, because the input topic has but 1
partition, you could run your application with up to 1
app instances (so not really a whole lot of parallelism, heh).500
partitions, then the KTable has 500
partitions, too, with a size of ~ 2 GB each (assuming data is evenly spread across the partitions). Here, you could run your application with up to 500
app instances. If you were to run exactly 500
instances, then each app instance would get exactly 1
partition/shard of the logical KTable, thus ending up with 2 GB of table data; if you were to run only 100
instances, then each instance would get 500 / 100 = 5
partitions/shards of the table, ending up with about 2 GB * 5 = 10 GB
of table data.Your overall observation is correct and it depends what tradeoffs are more important for you. If you metadata is small, option 1 seems to be the better one. If metadata is big, it seems option 2 is the way to go.
If you use map()
, you need to have a complete copy of your metadata in each application instance (as you cannot know exactly how Streams will partition you KStream
data). Thus, if your metadata does not fit into main-memory using map()
would not work easily.
If you use KTable
, Streams will take care that metadata is sharded correctly over all running application instances, such that no data duplication is required. Furthermore, a KTable
uses RocksDB as state store engine and thus can spill to disk.
EDIT BEGIN
About having all data in KTable
: if you have two categories for the same key, the second value would overwrite the first value if you read the data directly from the topic into a KTable
via builder.table(...)
(changelog semantics). However, you can work around this easily by reading the topic as a record stream (ie, builder.stream(...)
and apply an aggregation to compute the KTable
. Your aggregation would simply emit a list of all values for each key.
About deleting: KTable
uses changelog semantics and does understand tombstone message to delete key-value pairs. Thus, if you read a KTable
from a topic and the topic contains a <key:null>
message, the current record in KTable
with this key will get deleted. This is harder to achieve when the KTable
is a result of an aggregation, because an aggregation input record with null
key or null
value will simply get ignored and does not update the aggregation result.
The workaround would be to add a map()
step before the aggregation and introduce a NULL
value (ie, an user defined "object" that represents the tombstone but is not null
-- in your case, you could call it a null-category
). In your aggregation, you just return a null
value as aggegation result if the input record has null-category
as value. This will then translate in a tombstone message for your KTable
and delete the current list of categories for this key.
EDIT END
And of course you can always build a custom solution via Processor API. However, if DSL can give you want you need, there is no good reason to do this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With