I have Apache Mesos 0.22.1 cluster (3 masters & 5 slaves), running Cloudera HDFS (2.5.0-cdh5.3.1) in HA configuration and Spark 1.5.1 framework.
When I try to spark-submit compiled HdfsTest.scala example app (from Spark 1.5.1 sources) - it fails with java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfs
error in executor logs. This error is only observed when I pass HDFS HA Path as an argument hdfs://hdfs/<file>
, when I pass hdfs://namenode1.hdfs.mesos:50071/tesfile
- everything works fine.
What I've found after enabling TRACE logging is that Spark driver actually reads hdfs://hdfs
URL correctly, but Spark executor - doesn't.
My Scala app code:
import org.apache.spark._
object HdfsTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HdfsTest")
val sc = new SparkContext(sparkConf)
val file = sc.textFile(args(0))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
}
sc.stop()
}
}
I compile this code and submit jar file to Spark in cluster mode:
/opt/spark/bin/spark-submit --deploy-mode cluster --class com.cisco.hdfs.HdfsTest http://1.2.3.4/HdfsTest-0.0.1.jar hdfs://hdfs/testfile
My spark-defaults.conf file:
spark.master spark://1.2.3.4:7077
spark.eventLog.enabled true
spark.driver.memory 1g
My spark-env.sh file:
export HADOOP_HOME=/opt/spark
export HADOOP_CONF_DIR=/opt/spark/conf
I have spark deployed on each slave in /opt/spark directory.
I can accesses HDFS using "hdfs dfs -ls hdfs://hdfs/" command in console, without the need to specify active namenode address and port.
core-site.xml:
----------------------------------------------------------------------
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hdfs</value>
</property>
</configuration>
hdfs-site.xml:
----------------------------------------------------------------------
<configuration>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.nameservice.id</name>
<value>hdfs</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>hdfs</value>
</property>
<property>
<name>dfs.ha.namenodes.hdfs</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.hdfs.nn1</name>
<value>namenode1.hdfs.mesos:50071</value>
</property>
<property>
<name>dfs.namenode.http-address.hdfs.nn1</name>
<value>namenode1.hdfs.mesos:50070</value>
</property>
<property>
<name>dfs.namenode.rpc-address.hdfs.nn2</name>
<value>namenode2.hdfs.mesos:50071</value>
</property>
<property>
<name>dfs.namenode.http-address.hdfs.nn2</name>
<value>namenode2.hdfs.mesos:50070</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.hdfs</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider </value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://journalnode1.hdfs.mesos:8485;journalnode2.hdfs.mesos:8485;journalnode3.hdfs.mesos:8485/hdfs</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>master.mesos:2181</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/var/lib/hdfs/data/jn</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///var/lib/hdfs/data/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///var/lib/hdfs/data/data</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>shell(/bin/true)</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>10485760</value>
</property>
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>41943040</value>
</property>
<property>
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.90</value>
</property>
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>60000</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>10</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>20</value>
</property>
<property>
<name>dfs.image.compress</name>
<value>true</value>
</property>
<property>
<name>dfs.image.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
<property>
<name>dfs.namenode.invalidate.work.pct.per.iteration</name>
<value>0.35f</value>
</property>
<property>
<name>dfs.namenode.replication.work.multiplier.per.iteration</name>
<value>4</value>
</property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.streams.cache.size</name>
<value>1000</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.streams.cache.size.expiry.ms</name>
<value>1000</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hadoop-hdfs/dn._PORT</value>
</property>
</configuration>
This error typically indicates that there was a DNS resolution failure. If a Java application fails to get a valid DNS answer, then it might throw an UnknownHostException error. In addition to a DNS issue, the root cause of this error might be: A software issue that affected DNS resolution.
Spark is a fast and general processing engine compatible with Hadoop data. It can run in Hadoop clusters through YARN or Spark's standalone mode, and it can process data in HDFS, HBase, Cassandra, Hive, and any Hadoop InputFormat.
What happens when a Spark Job is submitted? When a client submits a spark user application code, the driver implicitly converts the code containing transformations and actions into a logical directed acyclic graph (DAG).
It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.
I've found the solution - adding
spark.files file:///opt/spark/conf/hdfs-site.xml,file:///opt/spark/conf/core-site.xml
to conf/spark-defaults.conf
on each slave solves the problem.
After that executors successfully download core-site.xml
and hdfs-site.xml
from driver program to executor program.
Spark internally will use default conf available for fs.defaultFS, which is your local file://
.
in-order it to honor HDFS HA you need to pass both core-site.xml
and hdfs-site.xml
to the SparkContext
via the CLASSPATH, or as below (make sure these files available in the local slave nodes in the same location eg: /config/core-site.xml
For example, Spark 1.x
val sc = new SparkContext(sparkConf)
Spark 2.x
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
val sc = sparkSession.sparkContext()
In either case,
sc.hadoopConfiguration().addResource(new org.apache.hadoop.fs.Path("/config/core-site.xml"));
sc.hadoopConfiguration().addResource(new org.apache.hadoop.fs.Path("/config/hdfs-site.xml"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With