Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reliable checkpoint (keeping complex state) for spark streaming jobs

We are using Spark 1.6 with JVM 1.6 on Red Hat 4.4.7 for running our spark streaming applications/jobs. Some of our streaming jobs use complex state and we have scala case class to represent them. But while testing the upgrade cycle of the jobs we are facing some issue as below. As streaming jobs will run for ever, need help in designing an easy to upgrade application.

I was checking the exact use case where the job is not able to restart from the checkpoint.

  • Just restarting the job without changing anything did not create the problem.
  • Restarting the job after doing random change (unrelated to state) did not create the problem.
  • Restarting the job after doing change to state handling function (like by adding print) did not create the problem.
  • Restarting the job after doing change to state (by adding a new Boolean field) did create the problem.

After doing some googling, the general guideline for handling the issue seems to be,

  1. Implement the state as "a format that stores schema along with data" as json or avro.
    • The client code will have to serialize before putting it to state and de-serialize it after reading it from state. The serialization and de-serialization will happen after every streaming interval, mapWithState might help slightly.
    • Upgrading the state from version x to y have to be handled explicitly if multiple version of the job can co-exist!!!
  2. Stop the input, finish the processing for the input, re-start as a fresh job with new checkpoint.
    • Though this is simple to implement it is not possible for couple of our jobs. Also the upgrade cycle will become slightly complex.
  3. Parallely save the data to external store also and on an upgrade load it as initialRDD.
    • This will introduce an external dependency for keeping the state.
    • Upgrading the state from version x to y have to be handled explicitly if multiple version of the job can co-exist!!!

As the information is scattered all across web, I felt confusing to come to conclusion. Below are my questions,

  1. If the structure of state class changes the checkpoint becomes invalid, but, is there any other known issue where the checkpoint becomes invalid, if assembly jar or functionality(not structure) of the state class changes?
  2. What is the strategy you are using for enabling easy upgrade of stateful spark streaming job?
like image 447
rakesh Avatar asked Nov 09 '22 13:11

rakesh


1 Answers

Consider a case of environment upgrade like jvm/scala/spark/etc... There is no guarantee the checkpoint can be reliable for ever irrespective of any change.

The checkpoint is designed to help in recovering in the unfortunate event of fault/crash only and not designed to be used as a data store!!!

The best alternative is to periodically flush the data in to a reliable store (HDFS/DB/etc...) and read the same as initial RDD in the event of any form of upgrade.

like image 127
rakesh Avatar answered Nov 15 '22 12:11

rakesh