I am trying to understand how jobs and stages are defined in spark, and for that I am now using the code that I found here and spark UI. In order to see it on spark UI I had to copy and paste the text on the files several times so it takes more time to process.
Here is the output of spark UI:
Now, I understand that there are three jobs because there are three actions and also that the stages are generated because of shuffle actions, but what I don't understand is why in the Job 1 stages 4, 5 and 6 are the same as stages 0, 1 and 2 of Job 0 and the same happens for Job 2.
How can I know what stages will be in more than a job only seeing the java code (before executing anything)? And also, why are stage 4 and 9 skipped and how can I know it will happen before executing?
I understand that there are three jobs because there are three actions
I'd even say that there could have been more Spark jobs but the minimum number is 3
. It all depends on the implementation of transformations and the action used.
I don't understand is why in the Job 1 stages 4, 5 and 6 are the same as stages 0, 1 and 2 of Job 0 and the same happens for Job 2.
Job 1 is the result of some action that ran on a RDD, finalRdd
. That RDD was created using (going backwards): join
, textFile
, map
, and distinct
.
val people = sc.textFile("people.csv").map { line =>
val tokens = line.split(",")
val key = tokens(2)
(key, (tokens(0), tokens(1))) }.distinct
val cities = sc.textFile("cities.csv").map { line =>
val tokens = line.split(",")
(tokens(0), tokens(1))
}
val finalRdd = people.join(cities)
Run the above and you'll see the same DAG.
Now, when you execute leftOuterJoin
or rightOuterJoin
actions, you'll get the other two DAGs. You're using the previously-used RDDs to run new Spark jobs and so you'll see the same stages.
why are stage 4 and 9 skipped
Often, Spark will skip execution of some stages. The grayed-out stages are ones already computed so Spark will reuse them and so make performance better.
How can I know what stages will be in more than a job only seeing the java code (before executing anything)?
That's what RDD lineage (graph) offers.
scala> people.leftOuterJoin(cities).toDebugString
res15: String =
(3) MapPartitionsRDD[99] at leftOuterJoin at <console>:28 []
| MapPartitionsRDD[98] at leftOuterJoin at <console>:28 []
| CoGroupedRDD[97] at leftOuterJoin at <console>:28 []
+-(2) MapPartitionsRDD[81] at distinct at <console>:27 []
| | ShuffledRDD[80] at distinct at <console>:27 []
| +-(2) MapPartitionsRDD[79] at distinct at <console>:27 []
| | MapPartitionsRDD[78] at map at <console>:24 []
| | people.csv MapPartitionsRDD[77] at textFile at <console>:24 []
| | people.csv HadoopRDD[76] at textFile at <console>:24 []
+-(3) MapPartitionsRDD[84] at map at <console>:29 []
| cities.csv MapPartitionsRDD[83] at textFile at <console>:29 []
| cities.csv HadoopRDD[82] at textFile at <console>:29 []
As you can see yourself, you will end up with 4 stages since there are 3 shuffle dependencies (the edges with the numbers of partitions).
Numbers in the round brackets are the number of partitions that DAGScheduler
will eventually use to create task sets with the exact number of tasks. One TaskSet
per stage.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With