I'm writing a simple spark application that uses some input RDD, sends it to an external script via pipe, and writes an output of that script to a file. Driver code looks like this:
val input = args(0)
val scriptPath = args(1)
val output = args(2)
val sc = getSparkContext
if (args.length == 4) {
//Here I pass an additional argument which contains an absolute path to a script on my local machine, only for local testing
sc.addFile(args(3))
}
sc.textFile(input).pipe(Seq("python2", SparkFiles.get(scriptPath))).saveAsTextFile(output)
When I run it on my local machine it works fine. But when I submit it to a YARN cluster via
spark-submit --master yarn --deploy-mode cluster --files /absolute/path/to/local/test.py --class somepackage.PythonLauncher path/to/driver.jar path/to/input/part-* test.py path/to/output`
it fails with an exception.
Lost task 1.0 in stage 0.0 (TID 1, rwds2.1dmp.ru): java.lang.Exception: Subprocess exited with status 2
I've tried different variations of the pipe command. For instance, .pipe("cat")
works fine, and behaves as expected, but .pipe(Seq("cat", scriptPath))
also fails with error code 1, so it seems that spark can't figure out a path to the script on a cluster node.
Any suggestions?
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
If you use yarn as manager on a cluster with multiple nodes you do not need to install spark on each node. Yarn will distribute the spark binaries to the nodes when a job is submitted. Running Spark on YARN requires a binary distribution of Spark which is built with YARN support.
You can submit a Spark batch application by using cluster mode (default) or client mode either inside the cluster or from an external client: Cluster mode (default): Submitting Spark batch application and having the driver run on a host in your driver resource group. The spark-submit syntax is --deploy-mode cluster.
I don't use python myself but I find some clues may be useful for you (in the source code of Spark-1.3
SparkSubmitArguments)
--py-files PY_FILES
, Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
--files FILES
, Comma-separated list of files to be placed in the working directory of each executor.
--archives ARCHIVES
, Comma separated list of archives to be extracted into the working directory of each executor.
And also, your arguments to spark-submit
should follow this style:
Usage: spark-submit [options] <app jar | python file> [app arguments]
To understand why, you must get familiar with the differences of the three running mode of spark, eg. standalone, yarn-client, yarn-cluster.
As with standalone and yarn-client, driver program runs at the current location of your local machine while worker program runs somewhere else(standalone maybe another temp directory under $SPARK_HOME, yarn-client maybe a random node in the cluster), so you can access local file with local path specified in the driver program but not in the worker program.
However, when you run with yarn-cluster mode, both your driver and worker program run at a random cluster node, local files are relative to their working machine and directory, thereby a file-not-found exception throws, you need to archive these files with either --files or --archive when submitting, or just archive them in .egg or .jar yourself before submit, or use addFile api in your driver program like this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With