Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark throw "SparkException: DStream has not been initialized" when restoring from checkpoint?

I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized.

Is there something specific I need to do when restoring from checkpointing?

I can see that it wants DStream.zeroTime set but when the stream is restored zeroTime is null. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext referenced by the restored stream does have a value for zeroTime.

initialize is a private method and gets called at StreamingContext.graph.start but not by StreamingContext.graph.restart, presumably because it expects zeroTime to have been persisted.

Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
like image 958
Shane Kinsella Avatar asked Jan 29 '16 17:01

Shane Kinsella


People also ask

What is DStream in Apache spark How does it work?

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.


2 Answers

The problem was that I created the dstreams after the StreamingContext had been recreated from checkpoint, i.e. after StreamingContext.getOrCreate. Creating dstreams and all transformations should've been in createStreamingContext.

The issue was filled as [SPARK-13316] "SparkException: DStream has not been initialized" when restoring StreamingContext from checkpoint and the dstream is created afterwards.

like image 97
Shane Kinsella Avatar answered Oct 13 '22 20:10

Shane Kinsella


This Exception may also occur when you are trying to use same check-pointing directory for 2 different spark streaming jobs. In that case also you will get this exception.

Try using unique checkpoint directory for each spark job.

like image 39
Ravi Shankar Avatar answered Oct 13 '22 19:10

Ravi Shankar