Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How are stages split into tasks in Spark?

Tags:

apache-spark

People also ask

How jobs stages and tasks are created in Spark?

A job comprises several stages. When Spark encounters a function that requires a shuffle it creates a new stage. Transformation functions like reduceByKey(), Join() etc will trigger a shuffle and will result in a new stage. Spark will also create a stage when you are reading a dataset.

How tasks are distributed in Spark?

The driver program splits the Spark application into the task and schedules them to run on the executor. The task scheduler resides in the driver and distributes task among workers. The two main key roles of drivers are: Converting user program into the task.

How stages are defined in Spark?

What are Stages in Spark? A stage is nothing but a step in a physical execution plan. It is a physical unit of the execution plan. It is a set of parallel tasks i.e. one task per partition. In other words, each job which gets divided into smaller sets of tasks is a stage.

How many tasks does Spark run on each partition?

Spark assigns one task per partition and each worker can process one task at a time.


You have a pretty nice outline here. To answer your questions

  • A separate task does need to be launched for each partition of data for each stage. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.

Note that the submission of Stages is driven by the DAG Scheduler. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.

We can see that in action in the following toy example in which we do the following types of operations:

  • load two datasources
  • perform some map operation on both of the data sources separately
  • join them
  • perform some map and filter operations on the result
  • save the result

So then how many stages will we end up with?

  • 1 stage each for loading the two datasources in parallel = 2 stages
  • A third stage representing the join that is dependent on the other two stages
  • Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.

Here is that toy program

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

And here is the DAG of the result

enter image description here

Now: how many tasks ? The number of tasks should be equal to

Sum of (Stage * #Partitions in the stage)


This might help you better understand different pieces:

  • Stage: is a collection of tasks. Same process running against different subsets of data (partitions).
  • Task: represents a unit of work on a partition of a distributed dataset. So in each stage, number-of-tasks = number-of-partitions, or as you said "one task per stage per partition”.
  • Each executer runs on one yarn container, and each container resides on one node.
  • Each stage utilizes multiple executers, each executer is allocated multiple vcores.
  • Each vcore can execute exactly one task at a time
  • So at any stage, multiple tasks could be executed in parallel. number-of-tasks running = number-of-vcores being used.

If I understand correctly there are 2 ( related ) things that confuse you:

1) What determines the content of a task?

2) What determines the number of tasks to be executed?

Spark's engine "glues" together simple operations on consecutive rdds, for example:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

so when rdd3 is (lazily) computed, spark will generate a task per partition of rdd1 and each task will execute both the filter and the map per line to result in rdd3.

The number of tasks is determined by the number of partitions. Every RDD has a defined number of partitions. For a source RDD that is read from HDFS ( using sc.textFile( ... ) for example ) the number of partitions is the number of splits generated by the input format. Some operations on RDD(s) can result in an RDD with a different number of partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Another example is joins:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

( Most ) operations that change the number of partitions involve a shuffle, When we do for example:

rdd2 = rdd1.repartition( 1000 ) 

what actually happens is the task on each partition of rdd1 needs to produce an end-output that can be read by the following stage so to make rdd2 have exactly 1000 partitions ( How they do it? Hash or Sort ). Tasks on this side are sometimes referred to as "Map ( side ) tasks". A task that will later run on rdd2 will act on one partition ( of rdd2! ) and would have to figure out how to read/combine the map-side outputs relevant to that partition. Tasks on this side are sometimes referred to as "Reduce ( side ) tasks".

The 2 questions are related: the number of tasks in a stage is the number of partitions ( common to the consecutive rdds "glued" together ) and the number of partitions of an rdd can change between stages ( by specifying the number of partitions to some shuffle causing operation for example ).

Once the execution of a stage commences, its tasks can occupy task slots. The number of concurrent task-slots is numExecutors * ExecutorCores. In general, these can be occupied by tasks from different, non-dependent stages.