I am reading Structured Streaming documentation.
On the one hand, if I get it right, under Policy for handling multiple watermarks they say that if you have different watermarks on two streams then Spark will use for both of them either the minimum value (by default) or the maximum value (if you specify it explicitly) as a global watermark (so Spark will ignore the other one).
On the other hand, under Inner Joins with optional Watermarking they have an example of two streams with different watermarks and they say that for each stream the specified watermark will be used (rather than just the minimum one or maximum one as a global watermark for both).
Perhaps I don't understand what they really try to explain under Policy for handling multiple watermarks, because they say that if you set the multipleWatermarkPolicy
to max
then the global watermark moves at the pace of the fastest stream, but it should be the complete opposit because a bigger watermark means that the stream is slower.
Watermarks help Spark understand the processing progress based on event time, when to produce windowed aggregates and when to trim the aggregations state. When joining streams of data, Spark, by default, uses a single, global watermark that evicts state based on the minimum event time seen across the input streams.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
If as far as I understand, you would like to know how multiple watermarks behave for join operations, right? I so, I did some dig into the implementation to find the answer.
spark.sql.streaming.multipleWatermarkPolicy
property is globally used for all operations involving multiple watermarks and its default is min. You can figure it out by looking at WatermarkTracker#updateWatermark(executedPlan: SparkPlan)
method called by MicroBatchExecution#runBatch
. And runBatch is invoked by org.apache.spark.sql.execution.streaming.StreamExecution#runStream
which is a class responsible for...stream execution ;)
updateWatermark
implementationupdateWatermark
starts by collecting all event-time watermark nodes from physical plan:
val watermarkOperators = executedPlan.collect {
case e: EventTimeWatermarkExec => e
}
if (watermarkOperators.isEmpty) return
watermarkOperators.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = operatorToWatermarkMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
operatorToWatermarkMap.put(index, newWatermarkMs)
}
// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!operatorToWatermarkMap.isDefinedAt(index)) {
operatorToWatermarkMap.put(index, 0)
}
}
To get an idea, a physical plan for stream-to-stream join could look like this:
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6a1dff1d
+- StreamingSymmetricHashJoin [mainKey#10730], [joinedKey#10733], Inner, condition = [ leftOnly = null, rightOnly = null, both = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms), full = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms) ], state info [ checkpoint = file:/tmp/temporary-3416be37-81b4-471a-b2ca-9b8f8593843a/state, runId = 17a4e028-29cb-41b0-b34b-44e20409b335, opId = 0, ver = 13, numPartitions = 200], 389000, state cleanup [ left value predicate: (mainEventTimeWatermark#10732-T4000ms <= 388999000), right = null ]
:- Exchange hashpartitioning(mainKey#10730, 200)
: +- *(2) Filter isnotnull(mainEventTimeWatermark#10732-T4000ms)
: +- EventTimeWatermark mainEventTimeWatermark#10732: timestamp, interval 4 seconds
: +- *(1) Filter isnotnull(mainKey#10730)
: +- *(1) Project [mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
: +- *(1) ScanV2 MemoryStreamDataSource$[mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
+- Exchange hashpartitioning(joinedKey#10733, 200)
+- *(4) Filter isnotnull(joinedEventTimeWatermark#10735-T8000ms)
+- EventTimeWatermark joinedEventTimeWatermark#10735: timestamp, interval 8 seconds
+- *(3) Filter isnotnull(joinedKey#10733)
+- *(3) Project [joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
+- *(3) ScanV2 MemoryStreamDataSource$[joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
Later, updateWatermark
uses one of available watermark policies which are MinWatermark
and MaxWatermark
, depending on the value you set in spark.sql.streaming.multipleWatermarkPolicy
. It's resolved that way in MultipleWatermarkPolicy
companion object:
def apply(policyName: String): MultipleWatermarkPolicy = {
policyName.toLowerCase match {
case DEFAULT_POLICY_NAME => MinWatermark
case "max" => MaxWatermark
case _ =>
throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'")
}
}
updateWatermark
uses the resolved policy to compute the watermark to apply on the query:
// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
if (chosenGlobalWatermark > globalWatermarkMs) {
logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
globalWatermarkMs = chosenGlobalWatermark
} else {
logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs")
}
However, I agree that the comment in previous snippet is a little bit misleading since it says about "Update the global watermark to the minimum of all watermark nodes." (https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala#L109)
The behavior on multiple watermarks is also asserted in EventTimeWatermarkSuite. Even though it applies to a UNION, you saw in the 2 first parts that the watermark is updated the same way for all combination operations.
To debug in your own, please check the following entries in the logs:
[2019-07-05 08:30:09,729] org.apache.spark.internal.Logging$class INFO Streaming query made progress
- returns all information about each executed query. In its eventTime
part you will find the watermark
property that should be different if you execute the same query with min and max multipleWatermarkPolicy[2019-07-05 08:30:35,685] org.apache.spark.internal.Logging$class INFO Updating event-time watermark from 0 to 6000 ms (org.apache.spark.sql.execution.streaming.WatermarkTracker:54)
- says that the watermark has just changed. As previously, should be different according to min/max property.So to wrap-up, starting from 2.4.0, we can chose one watermark (min or max). Prior to 2.4.0, min watermark was a default choice (SPARK-24730). And so independently on the operation type (inner join, outer join, ...) because the watermark resolution method is the same for all queries.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With