I have a Spark (Spark 1.5.2) application that streams data from Kafka to HDFS. My application contains two Typesafe config files to configure certain things like Kafka topic etc.
Now I want to run my application with spark-submit (cluster mode) in a cluster. The jar file with all dependencies of my project is stored on HDFS. As long as my config files are included in the jar file everything works fine. But this is unpractical for testing purposes because I always have to rebuild the jar.
Therefore I excluded the config files of my project and I added them via "driver-class-path". This worked on client mode but if I move the config files now to HDFS and run my application in cluster mode it can't find the settings. Below you can find my spark-submit command:
/usr/local/spark/bin/spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--verbose \
--deploy-mode cluster\
--class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs://iot-master:8020/user/spark/config \
--master spark://spark-master:6066 \
hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar
I already tried it with the --file parameter but that also didn't work. Does anybody know how I can fix this?
Update:
I did some further research and I figured out that it could be related to the HDFS path. I changed the HDFS path to "hdfs:///iot-master:8020//user//spark//config But unfortunately that also that didn't work. But maybe this could help you.
Below you can also see the error I get when I run the driver program in cluster mode:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...
Trying to achieve the same result I found out the following:
conf.addFile()
. so hdfs files wont work unless you are able to run hdfs dfs -get <....>
before to retrieve the file. in my case I want to run it from oozie so I dont know on which machine its going to run and I dont want to add a copy file action to my workflow.conf.addJar()
So as far as I know there is no strait way to load configuration file from hdfs.
My approach was to pass the path to my app and read the configuration file and merge it into reference file:
private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
val path = new Path(pathToConf)
val confFile = File.createTempFile(path.getName, "tmp")
confFile.deleteOnExit()
getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))
ConfigFactory.load(ConfigFactory.parseFile(confFile))
}
def getFileSystemByUri(uri: URI) : FileSystem = {
val hdfsConf = new Configuration()
hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}
P.S the error only means that the ConfigFactory didnt find any configuration file, so he couldn't find the property you are looking for.
One option is to use the --files
flag and with the HDFS location and make sure you add it to your executor classpath using the spark.executor.extraClassPath
flag with -Dconfig.file
:
Spark uses the following URL scheme to allow different strategies for disseminating jars:
- file: - Absolute paths and file:/ URIs are served by the driver’s HTTP file server, and every executor pulls the file from the driver HTTP server.
- hdfs:, http:, https:, ftp: - these pull down files and JARs from the URI as expected
- local: - a URI starting with local:/ is expected to exist as a local file on each worker node. This means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker, or shared via NFS, GlusterFS, etc.
Also, you can see it when looking at the help documentation for spark-submit
:
--files FILES Comma-separated list of files to be placed in the working
directory of each executor.
Running with spark-submit:
/usr/local/spark/bin/spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--conf "spark.executor.extraClassPath=-Dconfig.file=application.conf"
--verbose \
--deploy-mode cluster\
--class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs://iot-master:8020/user/spark/config \
--files hdfs:/path/to/conf \
--master spark://spark-master:6066 \
hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With