Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join on foreign key in Kafka stream

Lets say that I have three Kafka topics filled with events representing business events occuring in different aggregates (event sourcing application). These events allow to build aggregates with following attributes :

  • users : usedId, name
  • modules of an application : moduleId, name
  • grants of users for modules of application : grantId, userId, moduleId, scope

Now I want to create a stream of all grants with name of users and products (instead of id). I thought to do so :

  1. create a KTable for users by grouping events by userId. The KTable has userId as key. It is ok.
  2. create a KTable for products by grouping events by productId. The KTable has productId as key. It is ok.
  3. create a stream from the stream of Grants and joining on the two KTable. It is no ok. The problem is that joins seem only possible on primary keys. But the key of the stream is an technical identifier of the Grant and keys of users and products tables are not (they are agnostic of Grant).

So how to proceed ?

like image 995
gentiane Avatar asked Nov 12 '18 11:11

gentiane


2 Answers

Well, there is no direct support for Foreign key join at the moment in Kafka Streams.
There is an open KIP : https://issues.apache.org/jira/browse/KAFKA-3705 for the same.

For now, there can be a workaround to solve this problem. You can use KStream-KTable Join.

First Aggregate the User Stream and Module Stream into respective KTable with aggregated collection of Events.

KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);

Now select the moduleID as a key in the Grants stream.

KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);

It will change the key to moduleId. Now you can perform Stream-Table Join with ModuleTable. It will join all the matching records from right side for the key in the left side. Result stream will have Grant and Module data into one stream with ModuleId as key.

KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);

Next step is to join with userTable. Hence you need to rekey the grantModuleTable again with userId.

KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);

Now grantModuleRekeyedStream can be joined with userTable with KStream-KTable Join

 KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);

Above Stream will have user ID as a key and contain all grants and module details for that user.

like image 133
Nishu Tayal Avatar answered Oct 01 '22 09:10

Nishu Tayal


This feature was released as part of Kafka Streams 2.4.0.

Here's an official tutorial on using this feature.

like image 33
user456584 Avatar answered Oct 01 '22 09:10

user456584