I have 2 kafka topics - recommendations
and clicks
. The first topic has recommendations object keyed by a unique Id (called recommendationsId
). Each product has a URL which the user can click.
The clicks
topic gets the messages generated by clicks on those product URLs recommended to the user. It has been so set up that these click messages are also keyed by the recommendationId
.
Note that
relationship between recommendations and clicks is one-to-many. A recommendations may lead to multiple clicks but a click is always associated with a single recommendation.
each click object would have a corresponding recommendations object.
a click object would have a timestamp later than the recommendations object.
the gap between a recommendation and the corresponding click(s) could be a few seconds to a few days (say, 7 days at the most).
My goal is to join these two topics using Kafka streams join. What I am not clear about is whether I should use a KStream x KStream join or a KStream x KTable join.
I implemented the KStream x KTable
join by joining clicks
stream by recommendations
table. However, I am not able to see any joined clicks-recommendations pair if the recommendations were generated before the joiner was started and the click arrives after the joiner started.
Am I using the right join? Should I be using KStream x KStream
join? If so, in order to be able to join a click with a recommendation at most 7 days in the past, should I set the window size to 7 days? Do I also need to set the "retention" period in this case?
My code to perform KStream x KTable
join is as follows. Note that I have defined classes Recommendations
and Click
and their corresponding serde. The click messages are just plain String
(url). This URL String is joined with Recommendations
object to create a Click
object which is emitted to the jointTopic
.
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
// load clicks as KStream
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
// load recommendations as KTable
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
// join the two
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
// emit the join to the jointTopic
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
// let the action begin
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
This works fine as long as both recommendations and clicks have been generated after the joiner (the above program) is run. If, however, a click arrives for which the recommendation was generated before the joiner was run, I don't see any join happening. How do I fix this?
If the solution is to use KStream x KSTream
join, then please help me understand what window size I should select and what retention period to select.
Stream-stream joins combine two event streams into a new stream. The streams are joined based on a common key, so keys are necessary. You define a time window, and records on either side of the join need to arrive within the defined window.
Taking a leaf out of SQLs book, Kafka Streams supports three kinds of joins: Inner Joins: Emits an output when both input sources have records with the same key. Left Joins: Emits an output for each record in the left or primary input source.
Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.
Your overall observation is correct. Conceptually, you can get the correct result both ways. If you use stream-table join, you have two disadvantages (this might be revisited and improved in future release of Kafka though)
null
(ie, you get a retry logic) -- or course, consecutive clicks for a single recommendation might get out of order and you might need to account for this in you application code.KTable
would be, that it will grow forever and unbounded over time, as you will add more and more unique recommendations to it. Thus, you will need to implement some "expiration logic" by sending tombstones records of the form <recommendationsId, null>
to the recommendation topic to delete old recommendations you don't care about any longer.If you use a stream-stream join, and a click can happen 7 days after a recommendation, your window size must be 7 days -- otherwise, the click would not join with the recommendation.
For stream-stream join the retention time answer is a little different. It must be at lease 7 days, as the window size is 7 days. Otherwise, you would delete records of your "running window". You can also set the retention period longer, to be able to process "late data". Assume a user clicks at the end the window timeframe (5 minute before the 7 day time span of the recommendation ends), but the click is only reported 1 hour later to your application. If your retention period is 7 days as your window size, this late arriving record cannot be processed anymore (as the recommendation would have been deleted already). If you set a larger retention period of, e.g., 8 days you still can process late records. It depends on you application/semantical need what retention time you want to use.
Summary: From an implementation point of view, using stream-stream join is simpler than using stream-table join. However, memory/disk savings are expected and could be large depending on your click stream data rate.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With