Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming checkpoints for DStreams

In Spark Streaming it is possible (and mandatory if you're going to use stateful operations) to set the StreamingContext to perform checkpoints into a reliable data storage (S3, HDFS, ...) of (AND):

  • Metadata
  • DStream lineage

As described here, to set the output data storage you need to call yourSparkStreamingCtx.checkpoint(datastoreURL)

On the other hand, it is possible to set lineage checkpoint intervals for each DataStream by just calling checkpoint(timeInterval) at them. In fact, it is recommended to set lineage checkpoint interval between 5 and 10 times the DataStream's sliding interval:

dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

My question is:

When the streaming context has been set to perform checkpointing and no ds.checkpoint(interval) is called, is lineage checkpointing enabled for all data streams with a default checkpointInterval equal to batchInterval? Or is, on the contrary, only metadata checkpointing what is enabled?

like image 674
Pablo Francisco Pérez Hidalgo Avatar asked Dec 31 '15 18:12

Pablo Francisco Pérez Hidalgo


People also ask

Why does Spark streaming not offer fault-tolerance for DStreams?

Since Spark Streaming is built on Spark, it enjoys the same fault-tolerance for worker nodes. However, the demand of high uptimes of a Spark Streaming application require that the application also has to recover from failures of the driver process, which is the main application process that coordinates all the workers.

What is difference between DStreams and structured streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.


1 Answers

Checking Spark code (v1.5) I found that DStreams' checkpoint are enabled under two circumstances:

By an explicit call to their checkpoint method (not StreamContext's):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

At the DStream initialization as long as the concrete 'DStream' subclass has overridden mustCheckpoint attribute (setting it to true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

The first case is obvious. Performing a naive analysis on the Spark Streaming code:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

I can find that, in general (ignoring PythonDStream), StreamingContext checkpoint only enables lineage checkpoints for StateDStream and ReducedWindowedDStream instances. These instances are the result of the transformations (respectively, AND):

  • updateStateByKey: That is, the stream providing an state through several windows.
  • reduceByKeyAndWindow
like image 142
Pablo Francisco Pérez Hidalgo Avatar answered Oct 26 '22 04:10

Pablo Francisco Pérez Hidalgo