Re-run Spark jobs on Failure or Abort

I'm looking forward for configuration or parameter that auto restart the Spark Jobs in case of any failure submitted via Yarn. I know tasks auto restart on failure. I am exactly looking forward for a YARN or Spark configuration that would trigger re-run whole job.

Right now if any of our Job abort due to any issue, we have to re start it manually, that causes long data queue to process, as these are designed to work in near real-time.

Current configurations:



# Minimum TODOs on a per job basis:
# 1. define name, application jar path, main class, queue and log4j-yarn.properties path
# 2. remove properties not applicable to your Spark version (Spark 1.x vs. Spark 2.x)
# 3. tweak num_executors, executor_memory (+ overhead), and backpressure settings

# the two most important settings:

# 3-5 cores per executor is a good default balancing HDFS client throughput vs. JVM overhead
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

# backpressure

/usr/hdp/ --master yarn --deploy-mode cluster \
  --name br1_warid_ccn_sms_production \
  --class com.spark.main\
  --driver-memory 16g \
  --num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \
  --queue default \
  --files log4j-yarn-warid-br1-ccn-sms.properties \
  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties" \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties" \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer` \
  --conf spark.kryoserializer.buffer.max=1g \
  --conf spark.locality.wait=30 \
  --conf spark.task.maxFailures=8 `# Increase max task failures before failing job (Default: 4)` \
  --conf spark.ui.killEnabled=true `# Prevent killing of stages and corresponding jobs from the Spark UI` \
  --conf spark.logConf=true `# Log Spark Configuration in driver log for troubleshooting` \
  --conf spark.scheduler.mode=FAIR \
  --conf spark.default.parallelism=32 \
  --conf spark.streaming.blockInterval=200 `# [Optional] Tweak to balance data processing parallelism vs. task scheduling overhead (Default: 200ms)` \
  --conf spark.streaming.receiver.writeAheadLog.enable=true `# Prevent data loss on driver recovery` \
  --conf spark.streaming.backpressure.enabled=false \
  --conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# [Spark 1.x]: Corresponding max rate setting for Direct Kafka Streaming (Default: not set)` \
  --conf spark.yarn.driver.memoryOverhead=4096 `# [Optional] Set if --driver-memory < 5GB` \
  --conf spark.yarn.executor.memoryOverhead=4096 `# [Optional] Set if --executor-memory < 10GB` \
  --conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts (needs to be <= yarn.resourcemanager.am.max-attempts in YARN, which defaults to 2) (Default: yarn.resourcemanager.am.max-attempts)` \
  --conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour (Default: (none))` \
  --conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \
  --conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour` \
  --conf spark.task.maxFailures=8 \
  --conf spark.speculation=false \

Note: There are couple of questions on the subject area, but they do not have accepted answers, or the answer deviate from expected solution. Running a Spark application on YARN, without spark-submit How to configure automatic restart of the application driver on Yarn

This question explores the possible solutions from the scope of YARN and Spark.

1 Answers

Just a thought!

Let us call the script file (containing the above script) as run_spark_job.sh.

Try adding these statements at the end of the script:


if [[ ${return_code} -ne 0 ]]; then
    echo "Job failed"
    exit ${return_code}

echo "Job succeeded"
exit 0

Let us have another script file spark_job_runner.sh, from where we call the above script. For example,

while [ $? -ne 0 ]; do

YARN-based approaches: Update 1: This link will be a good read. It discusses YARN REST API to submit and track: https://community.hortonworks.com/articles/28070/starting-spark-jobs-directly-via-yarn-rest-api.html

Update 2: This link shows how to submit spark application to YARN environment using Java: https://github.com/mahmoudparsian/data-algorithms-book/blob/master/misc/how-to-submit-spark-job-to-yarn-from-java-code.md

Spark-based programmatic approach:

How to use the programmatic spark submit capability

Spark based configuration approach for YARN:

The only spark parameter on YARN mode for restarting is spark.yarn.maxAppAttempts and it should not exceed the YARN resource manager parameter yarn.resourcemanager.am.max-attempts

Excerpt from the official documentation https://spark.apache.org/docs/latest/running-on-yarn.html

The maximum number of attempts that will be made to submit the application.

