In Spark Streaming it is possible (and mandatory if you're going to use stateful operations) to set the StreamingContext
to perform checkpoints into a reliable data storage (S3, HDFS, ...) of (AND):
DStream
lineageAs described here, to set the output data storage you need to call yourSparkStreamingCtx.checkpoint(datastoreURL)
On the other hand, it is possible to set lineage checkpoint intervals for each DataStream
by just calling checkpoint(timeInterval)
at them. In fact, it is recommended to set lineage checkpoint interval between 5 and 10 times the DataStream
's sliding interval:
dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
My question is:
When the streaming context has been set to perform checkpointing and no ds.checkpoint(interval)
is called, is lineage checkpointing enabled for all data streams with a default checkpointInterval
equal to batchInterval
? Or is, on the contrary, only metadata checkpointing what is enabled?
Since Spark Streaming is built on Spark, it enjoys the same fault-tolerance for worker nodes. However, the demand of high uptimes of a Spark Streaming application require that the application also has to recover from failures of the driver process, which is the main application process that coordinates all the workers.
Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.
Checking Spark code (v1.5) I found that DStream
s' checkpoint are enabled under two circumstances:
By an explicit call to their checkpoint
method (not StreamContext
's):
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}
At the DStream
initialization as long as the concrete 'DStream' subclass has overridden mustCheckpoint
attribute (setting it to true
):
private[streaming] def initialize(time: Time) {
...
...
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
...
The first case is obvious. Performing a naive analysis on the Spark Streaming code:
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")
> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true
I can find that, in general (ignoring PythonDStream
), StreamingContext
checkpoint only enables lineage checkpoints for StateDStream
and ReducedWindowedDStream
instances. These instances are the result of the transformations (respectively, AND):
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With