Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I get a spark job to use all available resources on a Google Cloud DataProc cluster?

For example, I currently have a DataProc cluster consisting of a master and 4 workers, each machine has 8 vCPUs and 30GB memory.

Whenever I submit a job to the cluster, the cluster commits a max of 11GB total, and only engages 2 worker nodes to do the work, and on those nodes only uses 2 of the vCPU resources. This makes a job that should only take a few minutes take nearly an hour to execute.

I have tried editing the spark-defaults.conf file on the master node, and have tried running my spark-submit command with the arguments --executor-cores 4 --executor-memory 20g --num-executors 4 but neither has had any effect.

These clusters will only be spun up to perform a single task and will then be torn down, so the resources do not need to be held for any other jobs.

like image 225
Cam Avatar asked Dec 23 '22 00:12

Cam


2 Answers

I managed to resolve my issue by changing the scheduler to FIFO instead of FAIR, using the below at the end of my create command:

--properties spark:spark.scheduler.mode=FIFO

like image 107
Cam Avatar answered Apr 07 '23 21:04

Cam


You might want to see if what you're looking at is related to Dataproc set number of vcores per executor container - the number of vcores in-use reported by YARN is known to be incorrect, but it's only a cosmetic defect. On a Dataproc cluster with 8-core machines, the default configuration already does set 4 cores per executor; if you click through YARN to the Spark appmaster you should see that Spark is indeed able to pack 4 concurrent tasks per executor.

That part explains what might look like "only using 2 vCPU" per node.

The fact that the job only engages two of the worker nodes hints that there's more to it though; the amount of parallelism you get is related to how well the data is partitioned. If you have input files like gzip files that can't be split, then unfortunately there's not an easy way to increase input parallelism. However, at least in later pipeline stages or if you do have splittable files, you can increase parallelism be specifying the number of Spark partitions at read time or by calling repartition in your code. Depending on your input size, you could also experiment with decreasing fs.gs.block.size; that defaults to 134217728 (128MB) but you could set to half of that or a quarter of that or something either by setting it at cluster creation time:

--properties core:fs.gs.block.size=67108864

or at job submission time:

--properties spark.hadoop.fs.gs.block.size=67108864
like image 36
Dennis Huo Avatar answered Apr 07 '23 23:04

Dennis Huo