Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ideal way to enrich a KStream with lookup data

My stream has a column called 'category' and I have additional static metadata for each 'category' in a different store, it gets updated once every couple of days. What is the right way to do this lookup? There are two options with Kafka streams

  1. Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.

  2. Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

    • For example, say initially there was just one category 'c1'. Kafka streams app was stopped gracefully, and restarted again. After the restart, a new category 'c2' was added. My assumption is that, table = KStreamBuilder().table('metadataTopic') would just have the value 'c2', as that was the only thing that changed since the app started for second time. I would want it to have 'c1' and 'c2'.
    • If it does have 'c1' as well, would the data ever be removed from KTable (perhaps by setting sending key = null message ? ) ?

Which of the above is the right way to lookup metadata?

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

Is there another way using stores?

like image 911
Vignesh Chandramohan Avatar asked Dec 08 '16 00:12

Vignesh Chandramohan


People also ask

How do I create a kstream with names in Java?

Build a Stream Builder and use it to create a KStream that reads from the ‘ names ‘ topic. Enter the following code: val kstreamBuilder = new KStreamBuilder val rawStream: KStream [String, String] = kstreamBuilder.stream ("names") Next, using the mapValues method, map each value in the raw stream.

How to join records of one kstream with another kstream?

It returns a KGroupedStream containing the grouped records of the original KStream. This method joins the records of one stream with another KStream’s records using windowed inner equi join with default serializers and deserializers. With the join attribute thisKStream.key == otherKStream.key, the join is computed on the key of the records.

What is message enrichment in Kafka Streams?

Message enrichment is a standard stream processing task and I want to show different options Kafka Streams provides to implement it properly. Scenario 1: enriching using static (or mostly static) data

What is kstream in Salesforce?

A KStream is an abstraction of a record stream. Here, each data record represents a self-contained unit of data in the unbounded data set. In other words, data records in a record stream are always interpreted as an “ INSERT ”. The existing records are not replaced by the new ones having the same key.


2 Answers

  1. Load static data outside of Kafka Streams and just use KStreams#map() to add metadata. This is possible as Kafka Streams is just a library.

This works. But usually people opt for the next option you listed, because the side data to enrich the input stream with is typically not fully static; rather, it is changing but somewhat infrequently:

  1. Load the metadata to a Kafka topic, load it to a KTable and do KStreams#leftJoin(), this seems more natural and leaves partitioning etc to Kafka Streams. However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

This is the usual approach, and I'd recommend to stick to it unless you have a specific reason not to.

However, this requires us to keep the KTable loaded with all the values. Note that we would have to load the entire lookup data, and not just the changes.

So I guess you also prefer the second option, but you are concerned about whether or not this is efficient.

Short answer is: Yes, the KTable will be loaded with all the (latest) values per key. The table will contain the entire lookup data, but keep in mind that the KTable is partitioned behind the scenes: if, for example, your input topic (for the table) has 3 partitions, then you can run up to 3 instances of your application, each of which getting 1 partition of the table (assuming data is spread evenly across partitions, then each partition/shared of the table would hold about 1/3 of the table's data). So in practice more likely than not it "just works". I share more details below.

Global KTables: Alternatively, you can use global KTables instead of the (paritioned) normal table variant. With global tables every instance of your application has a full copy of the table data. This makes global tables very useful for join scenarios, including for enriching a KStream as per your question.

Is it possible to always force just one stream to be read from the beginning on restarts, this is so that all the metadata can be loaded into KTable.

You don't need to worry about that. Simply put, if there is no local "copy" of the table available, then the Streams API would automatically ensure that the table's data is read fully from scratch. If there is a local copy available, then your application will re-use that copy (and update its local copy whenever new data is available in the table's input topic).

Longer answer with examples

Imagine the following input data (think: changelog stream) for your KTable, note how this input consists of 6 messages:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

And here's the various states of the "logical" KTable that would result from this input is, where each newly received input message (such as (alice, 1)) would result in a new state of the table:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

What you can see here is that, even though the input data may have many, many messages (or "changes" as you said; here, we have 6), the number of entries/rows in the resulting KTable (which is undergoing continuous mutations based on the newly received input) is the number of unique keys in the input (here: starting out with 1, ramping up to 3), which typically is significantly less than the number of messages. So, if the number of messages in the input is N and the number of unique keys for these messages is M, then typically M << N (M is significantly smaller than N; plus, for the record, we have the invariant M <= N).

This is the first reason why "this requires us to keep the KTable loaded with all the values" is typically not an issue, because only the latest value is retained per key.

The second reason that helps is that, as Matthias J. Sax has pointed out, Kafka Streams uses RocksDB as the default storage engine for such tables (more precisely: the state stores that back a table). RocksDB allows you to maintain tables that are larger than the available main memory / Java heap space of your application because it can spill to local disk.

Lastly, the third reason is that a KTable is partitioned. So, if your input topic for the table is (say) configured with 3 partitions, then what's happening behind the scenes is that the KTable itself is partitioned (think: sharded) in the same way. In the example above, here's what you could end up with, though the exact "splits" depend on the how the original input data is spread across the partitions of the table's input topic:

Logical KTable (last state of what I showed above):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

Actual KTable, partitioned (assuming 3 partitions for the table's input topic, plus keys=usernames being spread evenly across partitions):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

In practice, this partitioning of the input data -- among other things -- allows you to "size" the actual manifestations of a KTable.

Another example:

  • Imagine the latest state of your KTable would typically have a size of 1 TB (again, the approximate size is a function of the number of unique message keys in the table's input data, multiplied by the average size of the associated message value).
  • If the table's input topic has only a 1 partition, then the KTable itself also has only 1 partition, with a size of 1 TB. Here, because the input topic has but 1 partition, you could run your application with up to 1 app instances (so not really a whole lot of parallelism, heh).
  • If the table's input topic has 500 partitions, then the KTable has 500 partitions, too, with a size of ~ 2 GB each (assuming data is evenly spread across the partitions). Here, you could run your application with up to 500 app instances. If you were to run exactly 500 instances, then each app instance would get exactly 1 partition/shard of the logical KTable, thus ending up with 2 GB of table data; if you were to run only 100 instances, then each instance would get 500 / 100 = 5 partitions/shards of the table, ending up with about 2 GB * 5 = 10 GB of table data.
like image 152
Michael G. Noll Avatar answered Oct 03 '22 05:10

Michael G. Noll


Your overall observation is correct and it depends what tradeoffs are more important for you. If you metadata is small, option 1 seems to be the better one. If metadata is big, it seems option 2 is the way to go.

If you use map(), you need to have a complete copy of your metadata in each application instance (as you cannot know exactly how Streams will partition you KStream data). Thus, if your metadata does not fit into main-memory using map() would not work easily.

If you use KTable, Streams will take care that metadata is sharded correctly over all running application instances, such that no data duplication is required. Furthermore, a KTable uses RocksDB as state store engine and thus can spill to disk.

EDIT BEGIN

About having all data in KTable: if you have two categories for the same key, the second value would overwrite the first value if you read the data directly from the topic into a KTable via builder.table(...) (changelog semantics). However, you can work around this easily by reading the topic as a record stream (ie, builder.stream(...) and apply an aggregation to compute the KTable. Your aggregation would simply emit a list of all values for each key.

About deleting: KTable uses changelog semantics and does understand tombstone message to delete key-value pairs. Thus, if you read a KTable from a topic and the topic contains a <key:null> message, the current record in KTable with this key will get deleted. This is harder to achieve when the KTable is a result of an aggregation, because an aggregation input record with null key or null value will simply get ignored and does not update the aggregation result.

The workaround would be to add a map() step before the aggregation and introduce a NULL value (ie, an user defined "object" that represents the tombstone but is not null -- in your case, you could call it a null-category). In your aggregation, you just return a null value as aggegation result if the input record has null-category as value. This will then translate in a tombstone message for your KTable and delete the current list of categories for this key.

EDIT END

And of course you can always build a custom solution via Processor API. However, if DSL can give you want you need, there is no good reason to do this.

like image 31
Matthias J. Sax Avatar answered Oct 03 '22 04:10

Matthias J. Sax