Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured streaming : watermark vs. exactly-once semantics

The programming guide says that structured streaming guarantees end-to-end exactly once semantics using appropriate sources/sinks.

However I'm not understanding how this works when the job crashes and we have a watermark applied.

Below is an example of how I currently imagine it working, please correct me on any points that I'm misunderstanding. Thanks in advance!

Example:

Spark Job: Count # events in each 1 hour window, with a 1 hour Watermark.

Messages:

  • A - timestamp 10am
  • B - timestamp 10:10am
  • C - timestamp 10:20am
  • X - timestamp 12pm
  • Y - timestamp 12:50pm
  • Z - timestamp 8pm

We start the job, read A, B, C from the Source and the job crashes at 10:30am before we've written them out to our Sink.

At 6pm the job comes back up and knows to re-process A, B, C using the saved checkpoint/WAL. The final count is 3 for the 10-11am window.

Next, it reads the new messages from Kafka, X, Y, Z in parallel since they belong to different partitions. Z is processed first, so the max event timestamp gets set to 8pm. When the job reads X and Y, they are now behind the watermark (8pm - 1 hour = 7pm), so they are discarded as old data. The final count is 1 for 8-9pm, and the job does not report anything for the 12-1pm window. We've lost data for X and Y.

---End example---

Is this scenario accurate? If so, the 1 hour watermark may be sufficient to handle late/out-of-order data when flowing normally from Kafka-Sspark, but not when the spark job goes down/Kafka connection is lost for a long period of time. Would the only option to avoid data loss be to use a watermark longer than you expect the job to ever go down for?

like image 219
Ray J Avatar asked Aug 08 '17 22:08

Ray J


Video Answer


2 Answers

The watermark is a fixed value during the minibatch. In your example, since X, Y and Z are processed in the same minibatch, watermark used for this records would be 9:20am. After completion of that minibatch watermark would be updated to 7pm.

Below the quote from the design doc for the feature SPARK-18124 which implements watermarking functionality:

To calculate the drop boundary in our trigger based execution, we have to do the following.

  • In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
  • After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger) - threshold

Probably simulation would be more description:

import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)

val schema = StructType(StructField("vilue", StringType) ::
                        StructField("timestamp", TimestampType) ::
                        Nil)

val eventStream = spark
  .readStream
  .option("sep", ";")
  .option("header", "false")
  .schema(schema)
  .csv(dir.toString)

// Watermarked aggregation
val eventsCount = eventStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .count

def writeFile(path: Path, data: String) {
  val file = fs.create(path)
  file.writeUTF(data)
  file.close()
}

// Debug query
val query = eventsCount.writeStream
  .format("console")
  .outputMode("complete")
  .option("truncate", "false")
  .trigger(ProcessingTime("5 seconds"))
  .start()

writeFile(new Path(dir, "file1"), """
  |A;2017-08-09 10:00:00
  |B;2017-08-09 10:10:00
  |C;2017-08-09 10:20:00""".stripMargin)

query.processAllAvailable()
val lp1 = query.lastProgress

// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }


writeFile(new Path(dir, "file2"), """
  |Z;2017-08-09 20:00:00
  |X;2017-08-09 12:00:00
  |Y;2017-08-09 12:50:00""".stripMargin)

query.processAllAvailable()
val lp2 = query.lastProgress

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }

writeFile(new Path(dir, "file3"), "")

query.processAllAvailable()
val lp3 = query.lastProgress

// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }

query.stop()
fs.delete(dir, true)

Notice how Batch 0 started with watermark 1970-01-01 00:00:00 while Batch 1 started with watermark 2017-08-09 09:20:00 (max event time of Batch 0 minus 1 hour). Batch 2, while empty, used watermark 2017-08-09 19:00:00.

like image 120
nonsleepr Avatar answered Sep 18 '22 23:09

nonsleepr


Z is processed first, so the max event timestamp gets set to 8pm.

That's correct. Even though Z may be computed first, the watermark is subtracted from the maximum timestamp in the current query iteration. This means that 08:00 PM will be set as the time in which we subtract the watermark time from, meaning 12:00 and 12:50 will be discarded.

From the documentation:

For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T)


Would the only option to avoid data loss be to use a watermark longer than you expect the job to ever go down for

Not necessarily. Lets assume you set a maximum amount of data to be read per Kafka querying to 100 items. If you read small batches, and you're reading serially from each partition, each maximum timestamp for each batch may not be the maximum time of the latest message in the broker, meaning you won't lose these messages.

like image 30
Yuval Itzchakov Avatar answered Sep 19 '22 23:09

Yuval Itzchakov