Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running from a local IDE against a remote Spark cluster

We have a kerberized cluster with Spark running on Yarn. At the moment, we write our Spark code in Scala locally, then build a fat JAR which we copy over to the cluster and then run spark-submit. I would instead like to write Spark code on my local PC and have it run against the cluster directly. Is there a straightforward way to do this? The Spark docs don't seem to have any such pattern.

FYI, my local machine is running Windows and the cluster is running CDH.

like image 817
Vishakh Avatar asked Feb 14 '17 19:02

Vishakh


People also ask

Can I run Spark application locally by specifying local n?

You can run Spark in local mode. In this non-distributed single-JVM deployment mode, Spark spawns all the execution components - driver, executor, backend, and master - in the same single JVM. The default parallelism is the number of threads as specified in the master URL.

Can we run the Spark-submit in local mode in cluster?

Using --master option, you specify what cluster manager to use to run your application. Spark currently supports Yarn, Mesos, Kubernetes, Stand-alone, and local.

Can I run Spark job locally?

Local Mode is also known as Spark in-process is the default mode of spark. It does not require any resource manager. It runs everything on the same machine. Because of local mode, we are able to simply download spark and run without having to install any resource manager.


2 Answers

While cricket007's answer works for spark-submit, here is what I did to run against a remote cluster using IntelliJ:

First, make sure the JARs on the client and server sides are identical. Since we are using CDH 7.1, I made sure all my JARs came from the specific distribution.

Set HADOOP_CONF_DIR and YARN_CONF_DIR as described in cricket007's answer. Set "spark.yarn.principal" and "spark.yarn.keytab" as appropriate in the Spark conf.

If connecting to HDFS, make sure the following exclusion rule is set in build.sbt:

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0-cdh5.7.1" excludeAll ExclusionRule(organization = "javax.servlet")

Make sure the spark-launcher and spark-yarn JARs are listed on build.sbt.

libraryDependencies += "org.apache.spark" %% "spark-launcher" % "1.6.0-cdh5.7.1"

libraryDependencies += "org.apache.spark" %% "spark-yarn" % "1.6.0-cdh5.7.1"

Find the CDH JARs on the server and copy them to a known location on HDFS. Add the following lines to your code:

final val CDH_JAR_PATH = "/opt/cloudera/parcels/CDH/jars"

final val hadoopJars: Seq[String] = Seq[String](
"hadoop-annotations-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-cdh5.7.1.jar"
, "hadoop-ant-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-archive-logs-2.6.0-cdh5.7.1.jar"
, "hadoop-archives-2.6.0-cdh5.7.1.jar"
, "hadoop-auth-2.6.0-cdh5.7.1.jar"
, "hadoop-aws-2.6.0-cdh5.7.1.jar"
, "hadoop-azure-2.6.0-cdh5.7.1.jar"
, "hadoop-capacity-scheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-common-2.6.0-cdh5.7.1.jar"
, "hadoop-core-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-datajoin-2.6.0-cdh5.7.1.jar"
, "hadoop-distcp-2.6.0-cdh5.7.1.jar"
, "hadoop-examples-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-examples.jar"
, "hadoop-extras-2.6.0-cdh5.7.1.jar"
, "hadoop-fairscheduler-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-cdh5.7.1.jar"
, "hadoop-gridmix-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-hdfs-2.6.0-cdh5.7.1.jar"
, "hadoop-hdfs-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-kms-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-app-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-common-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-core-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-hs-plugins-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-jobclient-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-nativetask-2.6.0-cdh5.7.1.jar"
, "hadoop-mapreduce-client-shuffle-2.6.0-cdh5.7.1.jar"
, "hadoop-nfs-2.6.0-cdh5.7.1.jar"
, "hadoop-openstack-2.6.0-cdh5.7.1.jar"
, "hadoop-rumen-2.6.0-cdh5.7.1.jar"
, "hadoop-sls-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-cdh5.7.1.jar"
, "hadoop-streaming-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-tools-2.6.0-mr1-cdh5.7.1.jar"
, "hadoop-yarn-api-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-distributedshell-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-client-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-registry-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-common-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-nodemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-resourcemanager-2.6.0-cdh5.7.1.jar"
, "hadoop-yarn-server-web-proxy-2.6.0-cdh5.7.1.jar"
, "hbase-hadoop2-compat-1.2.0-cdh5.7.1.jar"
, "hbase-hadoop-compat-1.2.0-cdh5.7.1.jar")

final val sparkJars: Seq[String] = Seq[String](
"spark-1.6.0-cdh5.7.1-yarn-shuffle.jar",
"spark-assembly-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-avro_2.10-1.1.0-cdh5.7.1.jar",
"spark-bagel_2.10-1.6.0-cdh5.7.1.jar",
"spark-catalyst_2.10-1.6.0-cdh5.7.1.jar",
"spark-core_2.10-1.6.0-cdh5.7.1.jar",
"spark-examples-1.6.0-cdh5.7.1-hadoop2.6.0-cdh5.7.1.jar",
"spark-graphx_2.10-1.6.0-cdh5.7.1.jar",
"spark-hive_2.10-1.6.0-cdh5.7.1.jar",
"spark-launcher_2.10-1.6.0-cdh5.7.1.jar",
"spark-mllib_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-common_2.10-1.6.0-cdh5.7.1.jar",
"spark-network-shuffle_2.10-1.6.0-cdh5.7.1.jar",
"spark-repl_2.10-1.6.0-cdh5.7.1.jar",
"spark-sql_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume-sink_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-flume_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming-kafka_2.10-1.6.0-cdh5.7.1.jar",
"spark-streaming_2.10-1.6.0-cdh5.7.1.jar",
"spark-unsafe_2.10-1.6.0-cdh5.7.1.jar",
"spark-yarn_2.10-1.6.0-cdh5.7.1.jar")

def getClassPath(jarNames: Seq[String], pathPrefix: String): String = {
jarNames.foldLeft("")((cp, name) => s"$cp:$pathPrefix/$name").drop(1)

}

Add these lines when creating a SparkConf:

.set("spark.driver.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.executor.extraClassPath", getClassPath(sparkJars ++ hadoopJars, CDH_JAR_PATH))
.set("spark.yarn.jars", "hdfs://$YOUR_MACHINE/PATH_TO_JARS/*")

Your program should work now.

like image 159
Vishakh Avatar answered Nov 03 '22 21:11

Vishakh


Assuming you have the correct packages on your classpath (easiest setup by SBT, Maven, etc.), you should be able to spark-submit from anywhere. The --master flag is the main piece that really determines how the job is distributed. One thing to take into consideration is if your local machine is not blocked off from the YARN cluster via a firewall or other network prevention, for example. (Because you'd don't want people randomly running applications on your cluster)

From your local machine, you'll need the Hadoop configuration files from your cluster and setup $SPARK_HOME/conf directory to accommodate for some Hadoop related settings.

From Spark on YARN page.

Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration

These values are set from $SPARK_HOME/conf/spark-env.sh

Since you are Kerberized, see Long Running Spark Applciations

For long-running applications, such as Spark Streaming jobs, to write to HDFS, you must configure Kerberos authentication for Spark for Spark, and pass the Spark principal and keytab to the spark-submit script using the --principal and --keytab parameters

like image 21
OneCricketeer Avatar answered Nov 03 '22 21:11

OneCricketeer