I have a problem with re-balancing Apache Spark jobs resources on YARN Fair Scheduled queues.
For the tests I've configured Hadoop 2.6 (tried 2.7 also) to run in pseudo-distributed mode with local HDFS on MacOS. For job submission used "Pre-build Spark 1.4 for Hadoop 2.6 and later" (tried 1.5 also) distribution from Spark's website.
When tested with basic configuration on Hadoop MapReduce jobs, Fair Scheduler works as expected: When resources of the cluster exceed some maximum, fair shares are calculated and resources for jobs in different queues are preempted and balanced based on these calculations.
The same test is ran with Spark jobs, in that case YARN is making correct calculations of the fair shares for each job, but resources for Spark containers are not re-balanced.
Here are my conf files:
$HADOOP_HOME/etc/hadoop/yarn-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
<property>
<name>yarn.scheduler.fair.preemption</name>
<value>true</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/fair-scheduler.xml
<?xml version="1.0" encoding="UTF-8"?>
<allocations>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queue name="prod">
<weight>40</weight>
<schedulingPolicy>fifo</schedulingPolicy>
</queue>
<queue name="dev">
<weight>60</weight>
<queue name="eng" />
<queue name="science" />
</queue>
<queuePlacementPolicy>
<rule name="specified" create="false" />
<rule name="primaryGroup" create="false" />
<rule name="default" queue="dev.eng" />
</queuePlacementPolicy>
</allocations>
$HADOOP_HOME/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
$HADOOP_HOME/etc/hadoop/core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
And the test case is:
Run a job on the "prod" queue with weight 40 (must allocate 40% of all resources), as expected the job takes all needed free resources (62,5% of the clusters resources).
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512M \
--executor-memory 768M \
--executor-cores 1 \
--num-executors 2 \
--queue prod \
lib/spark-examples*.jar 100000
After that run the same job on the "dev.eng" queue with weight 60, that mean the job must allocate 60% of all resources and decrease the first job's resources to ~40%.
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn-cluster \
--driver-memory 512M \
--executor-memory 768M \
--executor-cores 1 \
--num-executors 2 \
--queue dev.eng \
lib/spark-examples*.jar 100000
Unfortunately, cluster resources are not changing - 62,5% for the first job and 37,5% for second.
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Yarn is a distributed container manager, like Mesos for example, whereas Spark is a data processing tool. Spark can run on Yarn, the same way Hadoop Map Reduce can run on Yarn. It just happens that Hadoop Map Reduce is a feature that ships with Yarn, when Spark is not.
The Apache Spark scheduler in Databricks automatically preempts tasks to enforce fair sharing. This guarantees interactive response times on clusters with many concurrently running jobs. Tip. When tasks are preempted by the scheduler, their kill reason will be set to preempted by scheduler .
You need to set one of the preemption timeouts in your allocation xml. One for minimum share and one for fair share, both are in seconds. By default, the timeouts are not set.
From Hadoop: The Definitive Guide 4th Edition
If a queue waits for as long as its minimum share preemption timeout without receiving its minimum guaranteed share, then the scheduler may preempt other containers. The default timeout is set for all queues via the defaultMinSharePreemptionTimeout top-level element in the allocation file, and on a per-queue basis by setting the minSharePreemptionTimeout element for a queue.
Likewise, if a queue remains below half of its fair share for as long as the fair share preemption timeout, then the scheduler may preempt other containers. The default timeout is set for all queues via the defaultFairSharePreemptionTimeout top-level element in the allocation file, and on a per-queue basis by setting fairSharePreemptionTimeout on a queue. The threshold may also be changed from its default of 0.5 by setting defaultFairSharePreemptionThreshold and fairSharePreemptionThreshold (per-queue).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With