I have 3 streams of data: foo
, bar
and baz
.
There's a necessity to join these streams with LEFT OUTER JOIN
in a following chain: foo -> bar -> baz
.
Here's an attempt to mimic these streams with built-in rate
stream:
val rateStream = session.readStream
.format("rate")
.option("rowsPerSecond", 5)
.option("numPartitions", 1)
.load()
val fooStream = rateStream
.select(col("value").as("fooId"), col("timestamp").as("fooTime"))
val barStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("barId"), col("timestamp").as("barTime"))
val bazStream = rateStream
.where(rand() < 0.5) // Introduce misses for ease of debugging
.select(col("value").as("bazId"), col("timestamp").as("bazTime"))
That's the first approach to join all together these streams, with assumption that potential delays for foo
, bar
and baz
are small (~ 5 seconds
):
val foobarStream = fooStream
.withWatermark("fooTime", "5 seconds")
.join(
barStream.withWatermark("barTime", "5 seconds"),
expr("""
barId = fooId AND
fooTime >= barTime AND
fooTime <= barTime + interval 5 seconds
"""),
joinType = "leftOuter"
)
val foobarbazQuery = foobarStream
.join(
bazStream.withWatermark("bazTime", "5 seconds"),
expr("""
bazId = fooId AND
bazTime >= fooTime AND
bazTime <= fooTime + interval 5 seconds
"""),
joinType = "leftOuter")
.writeStream
.format("console")
.start()
With setup from above, I'm able to observe following tuples of data:
(some_foo, some_bar, some_baz)
(some_foo, some_bar, null)
but still missing (some_foo, null, some_baz)
and (some_foo, null, null)
.
Any ideas, how to properly configure watermarks in order to get all combinations?
UPDATE:
After adding additional watermark for foobarStream
surprisingly on barTime
:
val foobarbazQuery = foobarStream
.withWatermark("barTime", "1 minute")
.join(/* ... */)`
I'm able to get this (some_foo, null, some_baz)
combination, but still missing (some_foo, null, null)
...
I'm leaving some information just for reference.
Chaining stream-stream joins doesn't work correctly because Spark only supports global watermark (instead of operator-wise watermark) which may lead to drop intermediate outputs between joins.
Apache Spark community indicated this issue and discussed while ago. Here's a link for more details: https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E
(Disclaimer: I'm the author initiated the mail thread.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With