Based on apache Kafka docs KStream-to-KStream Joins are always windowed joins
, my question is how can I control the size of the window? Is it the same size for keeping the data on the topic? Or for example, we can keep data for 1 month but join the stream just for past week?
Is there any good example to show a windowed KStream-to-kStream windowed join?
In my case let's say I have 2 KStream, kstream1
and kstream2
I want to be able to join 10 days of kstream1
to 30 days of kstream2
.
So the expected output would be 8000 records.
Outer KStream-KStream JoinAn outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.
Windowing. Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Windows are tracked per record key. Windowing operations are available in the Kafka Streams DSL.
A KTable is a changelog stream of updates—thus, a “plain” KTable is a stateless stream with different semantics than a KStream. However, often KTables are also materialized into a local state store, building a table that always contains the latest value for a key. If two KTables are joined they are always materialized.
Create the Kafka Streams topology A stream is opened up for each input topic. The input streams are then combined using the merge function, which creates a new stream that represents all of the events of its inputs.
That is absolutely possible. When you define you Stream operator, you specify the join window size explicitly.
KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days
stream1.leftJoin(stream2,
... // add ValueJoiner
JoinWindows.of(joinWindowSizeMs)
);
// or if you want to use retention time
stream1.leftJoin(stream2,
... // add ValueJoiner
(JoinWindows)JoinWindows.of(joinWindowSizeMs)
.until(windowRetentionTimeMs)
);
See http://docs.confluent.io/current/streams/developer-guide.html#joining-streams for more details.
The sliding window basically defines an additional join predicate. In SQL-like syntax this would be something like:
SELECT * FROM stream1, stream2
WHERE
stream1.key = stream2.key
AND
stream1.ts - before <= stream2.ts
AND
stream2.ts <= stream1.ts + after
where before == after == joinWindowSizeMs
in this example. before
and after
can also have different values if you use JoinWindows#before()
and JoinWindows#after()
to set those values explicitly.
The retention time of source topics, is completely independent of the specified windowRetentionTimeMs
that is applied to an changelog topic created by Kafka Streams itself. Window retention allows to join out-of-order records with each other, i.e., record that arrive late (keep in mind, that Kafka has an offset based ordering guarantee, but with regard to timestamps, record can be out-of-order).
In addition to what Matthias J. Sax said, there is a stream-to-stream (windowed) join example at: https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
This is for Confluent 3.1.x with Apache Kafka 0.10.1, i.e. the latest versions as of January 2017. See the master
branch in the repository above for code examples that use newer versions.
Here's the key part of the code example above (again, for Kafka 0.10.1), slightly adapted to your question. Note that this example happens to demonstrate an OUTER JOIN.
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With