Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Yarn on EMR not allocate all nodes to running Spark jobs?

Tags:

I'm running a job on Apache Spark on Amazon Elastic Map Reduce (EMR). Currently I'm running on emr-4.1.0 which includes Amazon Hadoop 2.6.0 and Spark 1.5.0.

When I start the job, YARN correctly has allocated all the worker nodes to the spark job (with one for the driver, of course).

I have the magic "maximizeResourceAllocation" property set to "true", and the spark property "spark.dynamicAllocation.enabled" also set to "true".

However, if I resize the emr cluster by adding nodes to the CORE pool of worker machines, YARN only adds some of the new nodes to the spark job.

For example, this morning I had a job that was using 26 nodes (m3.2xlarge, if that matters) - 1 for the driver, 25 executors. I wanted to speed up the job so I tried adding 8 more nodes. YARN has picked up all of the new nodes, but only allocated 1 of them to the Spark job. Spark did successfully pick up the new node and is using it as an executor, but my question is why is YARN letting the other 7 nodes just sit idle?

It's annoying for obvious reasons - I have to pay for the resources even though they're not being used, and my job hasn't sped up at all!

Anybody know how YARN decides when to add nodes to running spark jobs? What variables come into play? Memory? V-Cores? Anything?

Thanks in advance!

like image 910
retnuH Avatar asked Nov 26 '15 14:11

retnuH


People also ask

Do you need to install Spark on all nodes of YARN cluster?

No, it is not necessary to install Spark on all the 3 nodes. Since spark runs on top of Yarn, it utilizes yarn for the execution of its commands over the cluster's nodes.

How does Spark work with YARN?

In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.

What is YARN memory overhead in Spark?

Memory overhead is the amount of off-heap memory allocated to each executor. By default, memory overhead is set to either 10% of executor memory or 384, whichever is higher.


2 Answers

Okay, with the help of @sean_r_owen, I was able to track this down.

The problem was this: when setting spark.dynamicAllocation.enabled to true, spark.executor.instances shouldn't be set - an explicit value for that will override dynamic allocation and turn it off. It turns out that EMR sets it in the background if you do not set it yourself. To get the desired behaviour, you need to explicitly set spark.executor.instances to 0.

For the records, here is the contents of one of the files we pass to the --configurations flag when creating an EMR cluster:

[     {         "Classification": "capacity-scheduler",         "Properties": {             "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"         }     },      {         "Classification": "spark",         "Properties": {             "maximizeResourceAllocation": "true"         }     },      {         "Classification": "spark-defaults",         "Properties": {             "spark.dynamicAllocation.enabled": "true",             "spark.executor.instances": "0"         }     }  ] 

This gives us an EMR cluster where Spark uses all the nodes, including added nodes, when running jobs. It also appears to use all/most of the memory and all (?) the cores.

(I'm not entirely sure that it's using all the actual cores; but it is definitely using more than 1 VCore, which it wasn't before, but following Glennie Helles's advice it is now behaving better and using half of the listed VCores, which seems to equal the actual number of cores...)

like image 117
retnuH Avatar answered Oct 18 '22 19:10

retnuH


I observed the same behavior with nearly the same settings using emr-5.20.0. I didn't try to add nodes when the cluster is already running but using TASK nodes (together with just one CORE node). I'm using InstanceFleets to define MASTER, CORE and TASK nodes (with InstanceFleets I don't know which exact InstanceTypes I get and that is why I don't want to define the number of executors, cores and memory per executor myself but want that to be maximized/optimized automatically).

With this, it only uses two TASK nodes (probably the first two nodes which are ready to use?) but never scales up while more TASK nodes get provisioned and finishing the bootstrap phase.

What made it work in my case was to set the spark.default.parallelism parameter (to the number of total number of cores of my TASK nodes), which is the same number used for the TargetOnDemandCapacity or TargetSpotCapacity of the TASK InstanceFleet:

[     {         "Classification": "capacity-scheduler",         "Properties": {             "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"         }     },     {         "Classification": "spark",         "Properties": {             "maximizeResourceAllocation": "true"         }     },     {         "Classification": "spark-defaults",         "Properties": {             "spark.dynamicAllocation.enabled": "true",             "spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>         }     }  ] 

For the sake of completeness: I'm using one CORE node and several TASK nodes mainly to make sure the cluster has at least 3 nodes (1 MASTER, 1 CORE and at least one TASK node). Before I tried to used only CORE nodes, but as in my case the number of cores is calculated depending on the actual task it was possible to end up with a cluster consisting of just one MASTER and one CORE node. Using the maximizeResourceAllocation option such a cluster runs for ever doing nothing because the executor running the yarn application master is occupying that single CORE node completely.

like image 36
Patter Avatar answered Oct 18 '22 18:10

Patter