Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Re-run Spark jobs on Failure or Abort

I'm looking forward for configuration or parameter that auto restart the Spark Jobs in case of any failure submitted via Yarn. I know tasks auto restart on failure. I am exactly looking forward for a YARN or Spark configuration that would trigger re-run whole job.

Right now if any of our Job abort due to any issue, we have to re start it manually, that causes long data queue to process, as these are designed to work in near real-time.

Current configurations:

#!/bin/bash

export SPARK_MAJOR_VERSION=2

# Minimum TODOs on a per job basis:
# 1. define name, application jar path, main class, queue and log4j-yarn.properties path
# 2. remove properties not applicable to your Spark version (Spark 1.x vs. Spark 2.x)
# 3. tweak num_executors, executor_memory (+ overhead), and backpressure settings

# the two most important settings:
num_executors=6
executor_memory=32g

# 3-5 cores per executor is a good default balancing HDFS client throughput vs. JVM overhead
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
executor_cores=2

# backpressure
reciever_minRate=1
receiver_max_rate=10
receiver_initial_rate=10

/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --master yarn --deploy-mode cluster \
  --name br1_warid_ccn_sms_production \
  --class com.spark.main\
  --driver-memory 16g \
  --num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory} \
  --queue default \
  --files log4j-yarn-warid-br1-ccn-sms.properties \
  --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties" \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties" \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer` \
  --conf spark.kryoserializer.buffer.max=1g \
  --conf spark.locality.wait=30 \
  --conf spark.task.maxFailures=8 `# Increase max task failures before failing job (Default: 4)` \
  --conf spark.ui.killEnabled=true `# Prevent killing of stages and corresponding jobs from the Spark UI` \
  --conf spark.logConf=true `# Log Spark Configuration in driver log for troubleshooting` \
`# SPARK STREAMING CONFIGURATION` \
  --conf spark.scheduler.mode=FAIR \
  --conf spark.default.parallelism=32 \
  --conf spark.streaming.blockInterval=200 `# [Optional] Tweak to balance data processing parallelism vs. task scheduling overhead (Default: 200ms)` \
  --conf spark.streaming.receiver.writeAheadLog.enable=true `# Prevent data loss on driver recovery` \
  --conf spark.streaming.backpressure.enabled=false \
  --conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# [Spark 1.x]: Corresponding max rate setting for Direct Kafka Streaming (Default: not set)` \
`# YARN CONFIGURATION` \
  --conf spark.yarn.driver.memoryOverhead=4096 `# [Optional] Set if --driver-memory < 5GB` \
  --conf spark.yarn.executor.memoryOverhead=4096 `# [Optional] Set if --executor-memory < 10GB` \
  --conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts (needs to be <= yarn.resourcemanager.am.max-attempts in YARN, which defaults to 2) (Default: yarn.resourcemanager.am.max-attempts)` \
  --conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour (Default: (none))` \
  --conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))` \
  --conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour` \
  --conf spark.task.maxFailures=8 \
  --conf spark.speculation=false \
/home//runscripts/production.jar

Note: There are couple of questions on the subject area, but they do not have accepted answers, or the answer deviate from expected solution. Running a Spark application on YARN, without spark-submit How to configure automatic restart of the application driver on Yarn

This question explores the possible solutions from the scope of YARN and Spark.

like image 626
Imran Avatar asked Oct 29 '17 11:10

Imran


People also ask

How do I restart a failed Spark job?

When you have failed tasks, you need to find the Stage that the tasks belong to. To do this, click on Stages in the Spark UI and then look for the Failed Stages section at the bottom of the page. If an executor runs into memory issues, it will fail the task and restart where the last task left off.

What happens if a Spark job fails?

Failure of worker node – The node which runs the application code on the Spark cluster is Spark worker node. These are the slave nodes. Any of the worker nodes running executor can fail, thus resulting in loss of in-memory If any receivers were running on failed nodes, then their buffer data will be lost.

How do you troubleshoot failed Spark jobs?

Resolution: Navigate to the Analyze page. Click on the Resources tab to analyze the errors and perform the appropriate action. Run the job again using the Spark Submit Command Line Options on the Analyze page.


1 Answers

Just a thought!

Let us call the script file (containing the above script) as run_spark_job.sh.

Try adding these statements at the end of the script:

return_code=$?

if [[ ${return_code} -ne 0 ]]; then
    echo "Job failed"
    exit ${return_code}
fi

echo "Job succeeded"
exit 0

Let us have another script file spark_job_runner.sh, from where we call the above script. For example,

./run_spark_job.sh
while [ $? -ne 0 ]; do
    ./run_spark_job.sh
done

YARN-based approaches: Update 1: This link will be a good read. It discusses YARN REST API to submit and track: https://community.hortonworks.com/articles/28070/starting-spark-jobs-directly-via-yarn-rest-api.html

Update 2: This link shows how to submit spark application to YARN environment using Java: https://github.com/mahmoudparsian/data-algorithms-book/blob/master/misc/how-to-submit-spark-job-to-yarn-from-java-code.md

Spark-based programmatic approach:

How to use the programmatic spark submit capability

Spark based configuration approach for YARN:

The only spark parameter on YARN mode for restarting is spark.yarn.maxAppAttempts and it should not exceed the YARN resource manager parameter yarn.resourcemanager.am.max-attempts

Excerpt from the official documentation https://spark.apache.org/docs/latest/running-on-yarn.html

The maximum number of attempts that will be made to submit the application.

like image 146
Marco99 Avatar answered Sep 21 '22 17:09

Marco99