Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark manage stages?

Tags:

apache-spark

I am trying to understand how jobs and stages are defined in spark, and for that I am now using the code that I found here and spark UI. In order to see it on spark UI I had to copy and paste the text on the files several times so it takes more time to process.

Here is the output of spark UI:

Jobs

Now, I understand that there are three jobs because there are three actions and also that the stages are generated because of shuffle actions, but what I don't understand is why in the Job 1 stages 4, 5 and 6 are the same as stages 0, 1 and 2 of Job 0 and the same happens for Job 2.

How can I know what stages will be in more than a job only seeing the java code (before executing anything)? And also, why are stage 4 and 9 skipped and how can I know it will happen before executing?

like image 867
Thirsty Avatar asked Dec 31 '16 10:12

Thirsty


1 Answers

I understand that there are three jobs because there are three actions

I'd even say that there could have been more Spark jobs but the minimum number is 3. It all depends on the implementation of transformations and the action used.

I don't understand is why in the Job 1 stages 4, 5 and 6 are the same as stages 0, 1 and 2 of Job 0 and the same happens for Job 2.

Job 1 is the result of some action that ran on a RDD, finalRdd. That RDD was created using (going backwards): join, textFile, map, and distinct.

val people = sc.textFile("people.csv").map { line =>
  val tokens = line.split(",")
  val key = tokens(2)
  (key, (tokens(0), tokens(1))) }.distinct

val cities = sc.textFile("cities.csv").map { line =>
  val tokens = line.split(",")
  (tokens(0), tokens(1))
}

val finalRdd = people.join(cities)

Run the above and you'll see the same DAG.

Job 6

Now, when you execute leftOuterJoin or rightOuterJoin actions, you'll get the other two DAGs. You're using the previously-used RDDs to run new Spark jobs and so you'll see the same stages.

why are stage 4 and 9 skipped

Often, Spark will skip execution of some stages. The grayed-out stages are ones already computed so Spark will reuse them and so make performance better.

Job 7

How can I know what stages will be in more than a job only seeing the java code (before executing anything)?

That's what RDD lineage (graph) offers.

scala> people.leftOuterJoin(cities).toDebugString
res15: String =
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 []
 |  MapPartitionsRDD[98] at leftOuterJoin at <console>:28 []
 |  CoGroupedRDD[97] at leftOuterJoin at <console>:28 []
 +-(2) MapPartitionsRDD[81] at distinct at <console>:27 []
 |  |  ShuffledRDD[80] at distinct at <console>:27 []
 |  +-(2) MapPartitionsRDD[79] at distinct at <console>:27 []
 |     |  MapPartitionsRDD[78] at map at <console>:24 []
 |     |  people.csv MapPartitionsRDD[77] at textFile at <console>:24 []
 |     |  people.csv HadoopRDD[76] at textFile at <console>:24 []
 +-(3) MapPartitionsRDD[84] at map at <console>:29 []
    |  cities.csv MapPartitionsRDD[83] at textFile at <console>:29 []
    |  cities.csv HadoopRDD[82] at textFile at <console>:29 []

As you can see yourself, you will end up with 4 stages since there are 3 shuffle dependencies (the edges with the numbers of partitions).

Numbers in the round brackets are the number of partitions that DAGScheduler will eventually use to create task sets with the exact number of tasks. One TaskSet per stage.

like image 87
Jacek Laskowski Avatar answered Oct 25 '22 05:10

Jacek Laskowski