Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fully utilize all Spark nodes in cluster?

I have launched a 10 node cluster with the ec2-script in standalone mode for Spark. I am accessing data in s3 buckets from within the PySpark shell but when I perform transormations on the RDD, only one node is ever used. For example the below will read in data from the CommonCorpus:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

When I run this, only one of my 10 slaves processes the data. I know this because only one slave (213) has any logs of the activity when viewed from the Spark web console. When I view the the activity in Ganglia, this same node (213) is the only slave with a spike in mem usage when the activity was run. enter image description here

Furthermore I have the exact same performance when I run the same script with an ec2 cluster of only one slave. I am using Spark 1.1.0 and any help or advice is greatly appreciated.

like image 441
Michael David Watson Avatar asked Dec 17 '14 17:12

Michael David Watson


People also ask

Do you need to install Spark on all nodes of the YARN cluster?

If you use yarn as manager on a cluster with multiple nodes you do not need to install spark on each node. Yarn will distribute the spark binaries to the nodes when a job is submitted. Running Spark on YARN requires a binary distribution of Spark which is built with YARN support.

How many nodes are in a Spark cluster?

11 nodes (1 master node and 10 worker nodes) 66 cores (6 cores per node) 110 GB RAM (10 GB per node)

How does Apache spark run on a cluster?

Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.


1 Answers

...ec2.internal.warc.gz

I think you've hit a fairly typical problem with gzipped files in that they cannot be loaded in parallel. More specifically, a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition.

(Note, however, that Spark can load 10 gzipped files in parallel just fine; it's just that each of those 10 files can only be loaded by 1 task. You can still get parallelism across files, just not within a file.)

You can confirm that you only have 1 partition by checking the number of partitions in your RDD explicitly:

data.getNumPartitions()

The upper bound on the number of tasks that can run in parallel on an RDD is the number of partitions in the RDD or the number of slave cores in your cluster, whichever is lower.

In your case, it's the number of RDD partitions. You can increase that by repartitioning your RDD as follows:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

Why sc.defaultParallelism * 3?

The Spark Tuning guide recommends having 2-3 tasks per core, and sc.defaultParalellism gives you the number of cores in your cluster.

like image 91
Nick Chammas Avatar answered Sep 18 '22 09:09

Nick Chammas