Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Stream and KTable One-to-Many Relationship Join

I have a kafka stream - say for blogs and a kafka table - say for comments related to those blogs. Key from kafka stream can map to multiple values in Kafka table i.e. one blog can have multiple comments. I want to do a join of these two and create a new object with an array of comment ids. But when I do the join, the stream contains only the last comment id. Is there any documentation or example code which can point me right direction how to achieve this? Basically, is there any documentation elaborating how to do one to many relationship join using Kafka stream and Kafka table?

KStream<Integer, EnrichedBlog> joinedBlogComments = blogsStream.join(commentsTbl,
              (blogId, blog) -> blog.getBlogId(),
              (blog, comment) -> new EnrichedBlog(blog, comment));

So instead of comment - I need to have an array of comment ids.

like image 298
Pratikshya Kuinkel Avatar asked May 31 '17 03:05

Pratikshya Kuinkel


1 Answers

I fail to find a join method with a signature matching that in your code example, but here's what I think is the problem:

KTables are interpreted as a changlog, that is to say, every next message with the same key is interpreted as an update to the record, not as a new record. That is why you are seeing only the last "comment" message for a given key (blog id), the previous values are being overwritten. To overcome this, you'll need to change how you populate your KTable in the first place. What you can do is to add your comment topic as a KStream to your topology and then perform an aggregation that simply builds an array or a list of comments that share the same blog id. That aggregation returns a KTable which you can join your blog KStream with.

Here's a sketch how you can do it to construct a List-valued KTable:

builder.stream("yourCommentTopic") // where key is blog id
.groupByKey()
.aggregate(() -> new ArrayList(), 
    (key, value, agg) -> new KeyValue<>(key, agg.add(value)),
    yourListSerde);

A list is easier to use in an aggregation than an array, so I suggest you convert it to an array downstream if needed. You will also need to provide a serde implementation for your list, "yourListSerde" in the example above.

like image 194
Michal Borowiecki Avatar answered Oct 01 '22 22:10

Michal Borowiecki