Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark policy for handling multiple watermarks

I am reading Structured Streaming documentation.

On the one hand, if I get it right, under Policy for handling multiple watermarks they say that if you have different watermarks on two streams then Spark will use for both of them either the minimum value (by default) or the maximum value (if you specify it explicitly) as a global watermark (so Spark will ignore the other one).

On the other hand, under Inner Joins with optional Watermarking they have an example of two streams with different watermarks and they say that for each stream the specified watermark will be used (rather than just the minimum one or maximum one as a global watermark for both).

Perhaps I don't understand what they really try to explain under Policy for handling multiple watermarks, because they say that if you set the multipleWatermarkPolicy to max then the global watermark moves at the pace of the fastest stream, but it should be the complete opposit because a bigger watermark means that the stream is slower.

like image 969
Alon Avatar asked Jun 16 '19 08:06

Alon


People also ask

What is watermarking in Spark structured streaming?

Watermarks help Spark understand the processing progress based on event time, when to produce windowed aggregates and when to trim the aggregations state. When joining streams of data, Spark, by default, uses a single, global watermark that evicts state based on the minimum event time seen across the input streams.

What is the use of saveAsObjectFiles () operation on Dstreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.


1 Answers

If as far as I understand, you would like to know how multiple watermarks behave for join operations, right? I so, I did some dig into the implementation to find the answer.

multipleWatermarkPolicy configuration used globally

spark.sql.streaming.multipleWatermarkPolicy property is globally used for all operations involving multiple watermarks and its default is min. You can figure it out by looking at WatermarkTracker#updateWatermark(executedPlan: SparkPlan) method called by MicroBatchExecution#runBatch. And runBatch is invoked by org.apache.spark.sql.execution.streaming.StreamExecution#runStream which is a class responsible for...stream execution ;)

updateWatermarkimplementation

updateWatermark starts by collecting all event-time watermark nodes from physical plan:

    val watermarkOperators = executedPlan.collect {
      case e: EventTimeWatermarkExec => e
    }
    if (watermarkOperators.isEmpty) return

    watermarkOperators.zipWithIndex.foreach {
      case (e, index) if e.eventTimeStats.value.count > 0 =>
        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
        val prevWatermarkMs = operatorToWatermarkMap.get(index)
        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
          operatorToWatermarkMap.put(index, newWatermarkMs)
        }

      // Populate 0 if we haven't seen any data yet for this watermark node.
      case (_, index) =>
        if (!operatorToWatermarkMap.isDefinedAt(index)) {
          operatorToWatermarkMap.put(index, 0)
        }
    }

To get an idea, a physical plan for stream-to-stream join could look like this:

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6a1dff1d
+- StreamingSymmetricHashJoin [mainKey#10730], [joinedKey#10733], Inner, condition = [ leftOnly = null, rightOnly = null, both = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms), full = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms) ], state info [ checkpoint = file:/tmp/temporary-3416be37-81b4-471a-b2ca-9b8f8593843a/state, runId = 17a4e028-29cb-41b0-b34b-44e20409b335, opId = 0, ver = 13, numPartitions = 200], 389000, state cleanup [ left value predicate: (mainEventTimeWatermark#10732-T4000ms <= 388999000), right = null ]
   :- Exchange hashpartitioning(mainKey#10730, 200)
   :  +- *(2) Filter isnotnull(mainEventTimeWatermark#10732-T4000ms)
   :     +- EventTimeWatermark mainEventTimeWatermark#10732: timestamp, interval 4 seconds
   :        +- *(1) Filter isnotnull(mainKey#10730)
   :           +- *(1) Project [mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
   :              +- *(1) ScanV2 MemoryStreamDataSource$[mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
   +- Exchange hashpartitioning(joinedKey#10733, 200)
      +- *(4) Filter isnotnull(joinedEventTimeWatermark#10735-T8000ms)
         +- EventTimeWatermark joinedEventTimeWatermark#10735: timestamp, interval 8 seconds
            +- *(3) Filter isnotnull(joinedKey#10733)
               +- *(3) Project [joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
                  +- *(3) ScanV2 MemoryStreamDataSource$[joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]

Later, updateWatermark uses one of available watermark policies which are MinWatermark and MaxWatermark, depending on the value you set in spark.sql.streaming.multipleWatermarkPolicy. It's resolved that way in MultipleWatermarkPolicy companion object:

  def apply(policyName: String): MultipleWatermarkPolicy = {
    policyName.toLowerCase match {
      case DEFAULT_POLICY_NAME => MinWatermark
      case "max" => MaxWatermark
      case _ =>
        throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'")
    }
  }

updateWatermark uses the resolved policy to compute the watermark to apply on the query:

    // Update the global watermark to the minimum of all watermark nodes.
    // This is the safest option, because only the global watermark is fault-tolerant. Making
    // it the minimum of all individual watermarks guarantees it will never advance past where
    // any individual watermark operator would be if it were in a plan by itself.
    val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
    if (chosenGlobalWatermark > globalWatermarkMs) {
      logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
      globalWatermarkMs = chosenGlobalWatermark
    } else {
      logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs")
    }

Misc

However, I agree that the comment in previous snippet is a little bit misleading since it says about "Update the global watermark to the minimum of all watermark nodes." (https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala#L109)

The behavior on multiple watermarks is also asserted in EventTimeWatermarkSuite. Even though it applies to a UNION, you saw in the 2 first parts that the watermark is updated the same way for all combination operations.

To debug in your own, please check the following entries in the logs:

  • [2019-07-05 08:30:09,729] org.apache.spark.internal.Logging$class INFO Streaming query made progress - returns all information about each executed query. In its eventTime part you will find the watermark property that should be different if you execute the same query with min and max multipleWatermarkPolicy
  • [2019-07-05 08:30:35,685] org.apache.spark.internal.Logging$class INFO Updating event-time watermark from 0 to 6000 ms (org.apache.spark.sql.execution.streaming.WatermarkTracker:54) - says that the watermark has just changed. As previously, should be different according to min/max property.

So to wrap-up, starting from 2.4.0, we can chose one watermark (min or max). Prior to 2.4.0, min watermark was a default choice (SPARK-24730). And so independently on the operation type (inner join, outer join, ...) because the watermark resolution method is the same for all queries.

like image 171
Bartosz Konieczny Avatar answered Oct 12 '22 13:10

Bartosz Konieczny