I'm using Typesafe Config, https://github.com/typesafehub/config, to parameterize a Spark job running in yarn-cluster mode with a configuration file. The default behavior of Typesafe Config is to search the classpath for resources with names matching a regex and to load them into your configuration class automatically with ConfigFactory.load()
(for our purposes, assume the file it looks for is called application.conf
).
I am able to load the configuration file into the driver using --driver-class-path <directory containing configuration file>
, but using --conf spark.executor.extraClassPath=<directory containing configuration file>
does not put the resource on the classpath of all executors like it should. The executors report that they can not find a certain configuration setting for a key that does exist in the configuration file that I'm attempting to add to their classpaths.
What is the correct way to add a file to the classpaths of all executor JVMs using Spark?
According to the recommendations which we discussed above: Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.
SparkConf conf = new SparkConf() // 4 executor per instance of each worker . set("spark. executor. instances", "4") // 5 cores on each executor .
Broadly set the memory between 8GB and 16GB. This is an arbitrary choice and governed by the above two points. Pack as many executors as can be assigned to one cluster node. Evenly distribute cores to all executors.
It looks like the value of the spark.executor.extraClassPath
property is relative to the working directory of the application ON THE EXECUTOR.
So, to use this property correctly, one should use --files <configuration file>
to first direct Spark to copy the file to the working directory of all executors, then use spark.executor.extraClassPath=./
to add the executor's working directory to its classpath. This combination results in the executor being able to read values from the configuration file.
I use the SparkContext addFile method
val sc: SparkContext = {
val sparkConf = new SparkConf()
.set("spark.storage.memoryFraction", "0.3")
.set("spark.driver.maxResultSize", "10g")
.set("spark.default.parallelism", "1024")
.setAppName("myproject")
sparkConf.setMaster(getMaster)
val sc = new SparkContext(sparkConf)
sc.addFile("application.conf")
sc
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With