Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Watermarking for Spark structured streaming with three way joins

I have 3 streams of data: foo, bar and baz.

There's a necessity to join these streams with LEFT OUTER JOIN in a following chain: foo -> bar -> baz.

Here's an attempt to mimic these streams with built-in rate stream:

val rateStream = session.readStream
  .format("rate")
  .option("rowsPerSecond", 5)
  .option("numPartitions", 1)
  .load()

val fooStream = rateStream
  .select(col("value").as("fooId"), col("timestamp").as("fooTime"))

val barStream = rateStream
  .where(rand() < 0.5) // Introduce misses for ease of debugging
  .select(col("value").as("barId"), col("timestamp").as("barTime"))

val bazStream = rateStream
  .where(rand() < 0.5) // Introduce misses for ease of debugging
  .select(col("value").as("bazId"), col("timestamp").as("bazTime"))

That's the first approach to join all together these streams, with assumption that potential delays for foo, bar and baz are small (~ 5 seconds):

val foobarStream = fooStream
  .withWatermark("fooTime", "5 seconds")
  .join(
    barStream.withWatermark("barTime", "5 seconds"),
    expr("""
       barId = fooId AND
       fooTime >= barTime AND
       fooTime <= barTime + interval 5 seconds
           """),
    joinType = "leftOuter"
  )

val foobarbazQuery = foobarStream
  .join(
    bazStream.withWatermark("bazTime", "5 seconds"),
    expr("""
      bazId = fooId AND
      bazTime >= fooTime AND
      bazTime <= fooTime + interval 5 seconds
         """),
    joinType = "leftOuter")
  .writeStream
  .format("console")
  .start()

With setup from above, I'm able to observe following tuples of data:

  • (some_foo, some_bar, some_baz)
  • (some_foo, some_bar, null)

but still missing (some_foo, null, some_baz) and (some_foo, null, null).

Any ideas, how to properly configure watermarks in order to get all combinations?

UPDATE:

After adding additional watermark for foobarStream surprisingly on barTime:

val foobarbazQuery = foobarStream
  .withWatermark("barTime", "1 minute")
  .join(/* ... */)`

I'm able to get this (some_foo, null, some_baz) combination, but still missing (some_foo, null, null)...

like image 839
ChernikovP Avatar asked Aug 01 '18 15:08

ChernikovP


1 Answers

I'm leaving some information just for reference.

Chaining stream-stream joins doesn't work correctly because Spark only supports global watermark (instead of operator-wise watermark) which may lead to drop intermediate outputs between joins.

Apache Spark community indicated this issue and discussed while ago. Here's a link for more details: https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@%3Cdev.spark.apache.org%3E

(Disclaimer: I'm the author initiated the mail thread.)

like image 89
Jungtaek Lim Avatar answered Oct 11 '22 12:10

Jungtaek Lim