Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark paralellize slices to tasks/executors/workers?

Tags:

apache-spark

I have a 2-node Spark cluster with 4 cores per node.

        MASTER
(Worker-on-master)              (Worker-on-node1)

Spark config:

  • slaves: master, node1
  • SPARK_WORKER_INSTANCES=1

I am trying to understand Spark's paralellize behaviour. The sparkPi example has this code:

val slices = 8  // my test value for slices
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
  val x = random * 2 - 1
  val y = random * 2 - 1
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)

As per documentation:

Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster.

I set slices to be 8 which means the working set will be divided among 8 tasks on the cluster, in turn each worker node gets 4 tasks (1:1 per core)

Questions:

  1. Where can I see task level details? Inside executors I don't see task breakdown so I can see the effect of slices on the UI.

  2. How to programmatically find the working set size for the map function above? I assume it is n/slices (100000 above)

  3. Are the multiple tasks run by an executor run sequentially or paralell in multiple threads?

  4. Reasoning behind 2-4 slices per CPU.

  5. I assume ideally we should tune SPARK_WORKER_INSTANCES to correspond to number of cores in each node (in a homogeneous cluster) so that each core gets its own executor and task (1:1:1)

like image 547
nom-mon-ir Avatar asked Sep 05 '14 01:09

nom-mon-ir


2 Answers

Taking a stab at #4:

For #4 it's worth noting that "slices" and "partitions" are the same thing, there is a bug filed and efforts to clean up the docs: https://issues.apache.org/jira/browse/SPARK-1701

Here's a link that expands the reasoning in #4: http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism

Specifically look at the line:

In general, we recommend 2-3 tasks per CPU core in your cluster.

An important consideration is to avoid shuffling, and setting number of slices is part of that. It's a more complicated subject than I fully understand to explain fully here -- the basic idea is to partition your data into enough partitions/slices up front to avoid Spark having to re-shuffle to get more partitions later.

like image 65
JimLohse Avatar answered Oct 04 '22 03:10

JimLohse


I will try to answer your question as best I can:

1.- Where can I see task level details?

When submitting a job, Spark stores information about the task breakdown on each worker node, apart from the master. This data is stored, I believe (I have only tested with Spark for EC2), on the work folder under the spark directory.

2.- How to programmatically find the working set size for the map function?

Although I am not sure if it stores the size in memory of the slices, the logs mentioned on the first answer provide information about the amount of lines each RDD partition contains.

3.- Are the multiple tasks run by an executor run sequentially or paralelly in multiple threads?

I believe diferent tasks inside a node run sequentially. This is shown on the logs indicated above, which indicate the start and end time of every task.

4.- Reasoning behind 2-4 slices per CPU

Some nodes finish their tasks faster than others. Having more slices than available cores distributes the tasks in a balanced way avoiding long processing time due to slower nodes.

like image 26
Mikel Urkia Avatar answered Oct 04 '22 02:10

Mikel Urkia