Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Accessing HDFS HA from spark job (UnknownHostException error)

I have Apache Mesos 0.22.1 cluster (3 masters & 5 slaves), running Cloudera HDFS (2.5.0-cdh5.3.1) in HA configuration and Spark 1.5.1 framework.

When I try to spark-submit compiled HdfsTest.scala example app (from Spark 1.5.1 sources) - it fails with java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfs error in executor logs. This error is only observed when I pass HDFS HA Path as an argument hdfs://hdfs/<file>, when I pass hdfs://namenode1.hdfs.mesos:50071/tesfile - everything works fine.

What I've found after enabling TRACE logging is that Spark driver actually reads hdfs://hdfs URL correctly, but Spark executor - doesn't.

My Scala app code:

import org.apache.spark._
object HdfsTest {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HdfsTest")
    val sc = new SparkContext(sparkConf)
    val file = sc.textFile(args(0))
    val mapped = file.map(s => s.length).cache()
    for (iter <- 1 to 10) {
      val start = System.currentTimeMillis()
      for (x <- mapped) { x + 2 }
      val end = System.currentTimeMillis()
      println("Iteration " + iter + " took " + (end-start) + " ms")
    }
    sc.stop()
   }
  }

I compile this code and submit jar file to Spark in cluster mode:

/opt/spark/bin/spark-submit --deploy-mode cluster --class com.cisco.hdfs.HdfsTest http://1.2.3.4/HdfsTest-0.0.1.jar hdfs://hdfs/testfile

My spark-defaults.conf file:

spark.master                     spark://1.2.3.4:7077
spark.eventLog.enabled           true
spark.driver.memory              1g

My spark-env.sh file:

export HADOOP_HOME=/opt/spark
export HADOOP_CONF_DIR=/opt/spark/conf

I have spark deployed on each slave in /opt/spark directory.

I can accesses HDFS using "hdfs dfs -ls hdfs://hdfs/" command in console, without the need to specify active namenode address and port.

core-site.xml:
----------------------------------------------------------------------
<configuration>
 <property>
  <name>fs.default.name</name>
  <value>hdfs://hdfs</value>
 </property>
</configuration>

hdfs-site.xml:
----------------------------------------------------------------------
<configuration>
 <property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
 </property>

 <property>
  <name>dfs.nameservice.id</name>
  <value>hdfs</value>
 </property>

 <property>
  <name>dfs.nameservices</name>
  <value>hdfs</value>
 </property>

 <property>
  <name>dfs.ha.namenodes.hdfs</name>
  <value>nn1,nn2</value>
 </property>

 <property>
  <name>dfs.namenode.rpc-address.hdfs.nn1</name>
  <value>namenode1.hdfs.mesos:50071</value>
 </property>

 <property>
  <name>dfs.namenode.http-address.hdfs.nn1</name>
  <value>namenode1.hdfs.mesos:50070</value>
 </property>

 <property>
  <name>dfs.namenode.rpc-address.hdfs.nn2</name>
  <value>namenode2.hdfs.mesos:50071</value>
 </property>

 <property>
  <name>dfs.namenode.http-address.hdfs.nn2</name>
  <value>namenode2.hdfs.mesos:50070</value>
 </property>

 <property>
  <name>dfs.client.failover.proxy.provider.hdfs</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider      </value>
 </property>

 <property>
  <name>dfs.namenode.shared.edits.dir</name>
     <value>qjournal://journalnode1.hdfs.mesos:8485;journalnode2.hdfs.mesos:8485;journalnode3.hdfs.mesos:8485/hdfs</value>
   </property>

 <property>
   <name>ha.zookeeper.quorum</name>
   <value>master.mesos:2181</value>
 </property>

 <property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/var/lib/hdfs/data/jn</value>
 </property>

 <property>
   <name>dfs.namenode.name.dir</name>
   <value>file:///var/lib/hdfs/data/name</value>
 </property>

 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:///var/lib/hdfs/data/data</value>
 </property>

 <property>
  <name>dfs.ha.fencing.methods</name>
  <value>shell(/bin/true)</value>
 </property>

 <property>
  <name>dfs.permissions</name>
  <value>false</value>
 </property>

 <property>
  <name>dfs.datanode.du.reserved</name>
  <value>10485760</value>
 </property>

 <property>
  <name>dfs.datanode.balance.bandwidthPerSec</name>
  <value>41943040</value>
 </property>

 <property>
   <name>dfs.namenode.safemode.threshold-pct</name>
   <value>0.90</value>
 </property>

 <property>
  <name>dfs.namenode.heartbeat.recheck-interval</name>
  <value>60000</value>
 </property>

 <property>
  <name>dfs.datanode.handler.count</name>
  <value>10</value>
 </property>

 <property>
  <name>dfs.namenode.handler.count</name>
  <value>20</value>
 </property>

 <property>
  <name>dfs.image.compress</name>
  <value>true</value>
 </property>

 <property>
  <name>dfs.image.compression.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
 </property>

 <property>
  <name>dfs.namenode.invalidate.work.pct.per.iteration</name>
  <value>0.35f</value>
 </property>

 <property>
  <name>dfs.namenode.replication.work.multiplier.per.iteration</name>
  <value>4</value>
 </property>

 <property>
  <name>dfs.namenode.datanode.registration.ip-hostname-check</name>
  <value>false</value>
 </property>

 <property>
   <name>dfs.client.read.shortcircuit</name>
   <value>true</value>
 </property>

 <property>
  <name>dfs.client.read.shortcircuit.streams.cache.size</name>
  <value>1000</value>
 </property>

 <property>
  <name>dfs.client.read.shortcircuit.streams.cache.size.expiry.ms</name>
   <value>1000</value>
 </property>

 <property>
  <name>dfs.domain.socket.path</name>
  <value>/var/run/hadoop-hdfs/dn._PORT</value>
 </property>
</configuration>
like image 204
kyarovoy Avatar asked Oct 16 '15 15:10

kyarovoy


People also ask

What can be possible reasons for getting an unknown host exception in spark job submission?

This error typically indicates that there was a DNS resolution failure. If a Java application fails to get a valid DNS answer, then it might throw an UnknownHostException error. In addition to a DNS issue, the root cause of this error might be: A software issue that affected DNS resolution.

Can spark connect to HDFS?

Spark is a fast and general processing engine compatible with Hadoop data. It can run in Hadoop clusters through YARN or Spark's standalone mode, and it can process data in HDFS, HBase, Cassandra, Hive, and any Hadoop InputFormat.

What happens when spark job is submitted?

What happens when a Spark Job is submitted? When a client submits a spark user application code, the driver implicitly converts the code containing transformations and actions into a logical directed acyclic graph (DAG).

How do I run a PySpark job in local mode?

It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.


2 Answers

I've found the solution - adding

spark.files file:///opt/spark/conf/hdfs-site.xml,file:///opt/spark/conf/core-site.xml

to conf/spark-defaults.conf on each slave solves the problem.

After that executors successfully download core-site.xml and hdfs-site.xml from driver program to executor program.

like image 96
kyarovoy Avatar answered Sep 30 '22 00:09

kyarovoy


Spark internally will use default conf available for fs.defaultFS, which is your local file://.

in-order it to honor HDFS HA you need to pass both core-site.xml and hdfs-site.xml to the SparkContext via the CLASSPATH, or as below (make sure these files available in the local slave nodes in the same location eg: /config/core-site.xml

For example, Spark 1.x

val sc = new SparkContext(sparkConf)

Spark 2.x

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
val sc = sparkSession.sparkContext()

In either case,

sc.hadoopConfiguration().addResource(new org.apache.hadoop.fs.Path("/config/core-site.xml"));
sc.hadoopConfiguration().addResource(new org.apache.hadoop.fs.Path("/config/hdfs-site.xml"));
like image 34
Kiran N Avatar answered Sep 30 '22 00:09

Kiran N