Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka streams - how to set a new key for KTable

I am new to Kafka Streams, I am using version 1.0.0. I would like to set a new key for a KTable from one of the values.

When using KStream, it cane be done by using method selectKey() like this.

kstream.selectKey ((k,v) -> v.newKey)

However such method is missing in KTable. Only way is to convert given KTable to KStream. Any thoughts on this issue? Its changing a key against design of KTable?

like image 835
Stefan Repcek Avatar asked Apr 15 '18 10:04

Stefan Repcek


2 Answers

If you want to set a new key, you need to re-group the KTable:

KTable newTable = table.groupBy(/*put select key function here*/)
                       .aggregate(...);

Because a key must be unique for a KTable (in contrast to a KStream) it's required to specify an aggregation function that aggregates all records with same (new) key into a single value.

Since Kafka 2.5, Kafka Streams also support KStream#toTable() operator. Thus, it is also possible to do table.toStream().selectKey(...).toTable(). There are advantages and disadvantages for both approaches.

The main disadvantage of using toTable() is that it will repartition the input data based on the new key, which leads to interleaves writes into the repartition topic and thus to out-of-order data. While the first approach via groupBy() uses the same implementation, using the aggregation function helps you to resolve "conflicts" expliclity. If you use the toTable() operator, an "blind" upsert based on offset order of the repartition topic is done (this is actually similar to the code example in the other answers).

Example:

Key | Value
 A  | (a,1)
 B  | (a,2)

If you re-key on a your output table would be either once of both (but it's not defined with one):

Key | Value          Key | Value
 a  | 1               a  |  2

The operation to "rekey" a table is semantically always ill-defined.

like image 150
Matthias J. Sax Avatar answered Sep 21 '22 13:09

Matthias J. Sax


@Matthias's answer led me down the right path, but I thought having a sample piece of code might help out here

final KTable<String, User> usersKeyedByApplicationIDKTable = usersKTable.groupBy(
        // First, going to set the new key to the user's application id
        (userId, user) -> KeyValue.pair(user.getApplicationID().toString(), user)
).aggregate(
        // Initiate the aggregate value
        () -> null,
        // adder (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user,
        // subtractor (doing nothing, just passing the user through as the value)
        (applicationId, user, aggValue) -> user
);

KGroupedTable aggregate() documentation: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate-org.apache.kafka.streams.kstream.Initializer-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Aggregator-org.apache.kafka.streams.kstream.Materialized-

like image 28
Allen Underwood Avatar answered Sep 19 '22 13:09

Allen Underwood