Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to config checkpoint to redeploy spark streaming application?

I'm using Spark streaming to count unique users. I use updateStateByKey, so I need config a checkpoint directory. I also load the data from checkpoint while start the application, as the example in the doc:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

Here is the question, if my code is changed, then I re-deploy the code, will the checkpoint be loaded no matter how much the code is changed? Or I need to use my own logic to persistence my data and load them in the next run.

If I use my own logic to save and load the DStream, then if the application restart on failure, won't the data loaded both from checkpoint directory and my own database?

like image 933
Bin Wang Avatar asked Sep 17 '15 05:09

Bin Wang


2 Answers

The checkpoint itself includes your metadata,rdd,dag and even your logic.If you change your logic and try to run it from the last checkpoint, your are very likely to hit an exception. If you want to use your own logic to save your data somewhere as checkpoint, you might need to implement an spark action to push your checkpoint data to whatever database, in the next run, load the checkpoint data as an initial RDD (in case u are using updateStateByKey API) and continue your logic.

like image 129
Xuan Huy Pham Avatar answered Nov 15 '22 09:11

Xuan Huy Pham


I've asked this question in the Spark mail list and have got an answer, I've analyzed it on my blog. I'll post the summarize here:

The way is to use both checkpointing and our own data loading mechanism. But we load our data as the initalRDD of updateStateByKey. So in both situations, the data will neither lost nor duplicate:

  1. When we change the code and redeploy the Spark application, we shutdown the old Spark application gracefully and cleanup the checkpoint data, so the only loaded data is the data we saved.

  2. When the Spark application is failure and restart, it will load the data from checkpoint. But the step of DAG is saved so it will not load our own data as initalRDD again. So the only loaded data is the checkpointed data.

like image 26
Bin Wang Avatar answered Nov 15 '22 07:11

Bin Wang