Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka KTable - shared aggregation across machines

Assume that I have a topic with numerous partitions. Im writing K/V data in there and want to aggregate said data in Tumbling Windows by keys.

Assume that I've launched as many worker instances as I have partitions and each worker instance is running on a separate machine.

How would I go about insuring that the resultant aggregations include all values for each key? IE I don't want each worker instance to have some subset of the values.

Is this something that a StateStore would be used for? Does Kafka manage this on its own or do I need to come up with a method?

like image 642
ethrbunny Avatar asked Aug 31 '16 18:08

ethrbunny


People also ask

How is KTable stored in Kafka?

Internally, a KTable is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable (ie, to RocksDB) is written into the corresponding Kafka topic.

How does Kafka aggregation work?

In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.

Can Kafka aggregate data?

Kafka Streams natively supports "incremental" aggregation functions, in which the aggregation result is updated based on the values captured by each window. Incremental functions include `count()`, `sum()`, `min()`, and `max()`.


2 Answers

How would I go about insuring that the resultant aggregations include all values for each key? IE I don't want each worker instance to have some subset of the values.

In general, Kafka Streams ensures that all values for the same key will be processed by the same (and only one) stream task, which also means only one application instance (what you described as "worker instance") will process the values for that key. Note that an app instance may run 1+ stream tasks, but these tasks are isolated.

This behavior is achieved through the partitioning of the data, and Kafka Streams ensures that a partition is always processed by the same and only one stream task. The logical link to keys/values is that, in Kafka and Kafka Streams, a key is always sent to the same partition (there is a gotcha here, but I'm not sure whether it makes sense to go into details for the scope of this question), hence one particular partition -- among possible many partitions -- contains all the values for the same key.

In some situations, such as when joining two streams A and B, you must ensure though that the aggregation will operate on the same key to ensure that data from both streams are co-located in the same stream task -- which, again, is all about ensuring that the relevant input stream partitions and thus matching the keys (from A and B, respectively) are made available in the same stream task. A typical method you'd use here is selectKey(). Once that is done, Kafka Streams ensures that, for joining the two streams A and B as well as for creating the joined output stream, all values for the same key will be processed by the same stream task and thus the same application instance.

Example:

  • Stream A has key userId with value { georegion }.
  • Stream B has key georegion with value { continent, description }.

Joining two streams only works (as of Kafka 0.10.0) when both streams use the same key. In this example, this means that you must re-key (and thus re-partition) stream A so that the resulting key is changed from userId to georegion. Otherwise, as of Kafka 0.10, you can't join A and B because data is not co-located in the stream task that is responsible for actually performing the join.

In this example, you could re-key/re-partition stream A via:

// Kafka 0.10.0.x (latest stable release as of Sep 2016)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId)).through("rekeyed-topic")

// Upcoming versions of Kafka (not released yet)
A.map((userId, georegion) -> KeyValue.pair(georegion, userId))

The through() call is only required in Kafka 0.10.0 to actually trigger re-partitioning, and later versions of Kafka will do these automatically for you (this upcoming functionality is already completed and available in Kafka trunk).

Is this something that a StateStore would be used for? Does Kafka manage this on its own or do I need to come up with a method?

In general, no. The behavior above is achieved through partitioning, not through state stores.

Sometimes state stores are involved because of the operations you have defined for a stream, which might explain why you were asking this question. For example, a windowing operation will require state to be managed, and thus a state store will be created behind the scenes. But your actual question -- "insuring that the resultant aggregations include all values for each key" -- has nothing to do with state stores, it's about the partitioning behavior.

like image 148
Michael G. Noll Avatar answered Oct 17 '22 22:10

Michael G. Noll


With worker instance, I assume you mean a Kafka Streams application instance, right? (Because there is no master/worker pattern in Kafka Streams -- it's a library and not a framework -- we do not use the term "worker".)

If you want to co-locate data per key, you need to partition the data by key. Thus, either your data is partitioned by key by your external producer when data gets written into a topic from the beginning on. Or you explicitly set a new key within Kafka Streams application (using for example selectKey() or map()) and re-distributed via a call to through(). (The explicit call to through() will not be necessary in future releases, ie, 0.10.1 and Kafka Streams will re-distribute records automatically if necessary.)

If messages/record should be partitioned, the key must not be null. You can also change the partitioning schema via producer configuration partitioner.class (see https://kafka.apache.org/documentation.html#producerconfigs).

Partitioning is completely independent from StateStores, even if StateStores are usually used on top of partitioned data.

like image 35
Matthias J. Sax Avatar answered Oct 17 '22 22:10

Matthias J. Sax