I ran the following job in the spark-shell:
val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect
The Spark UI shows three stages. Stage 4 and 5 correspond to the computation of d
, and stage 6 corresponds to the computation of the collect
action. Since d
is persisted, I would expect only two stages. However stage 5 is present not connected to any other stages.
So tried running the same computation without using persist, and the DAG looks like identically, except without the green dots indicating the RDD has been persisted.
I would expect the output of stage 11 to be connect to the input of stage 12, but it is not.
Looking at the stage descriptions, the stages seem to indicate that d
is being persisted, because stage 5 has input, but I am still confused as to why stage 5 even exists.
Stage Skipped means that data has been fetched from cache and re-execution of the given stage is not required. Basically the stage has been evaluated before, and the result is available without re-execution. It is consistent with your DAG which shows that the next stage requires shuffling (reduceByKey).
When you click on a job on the summary page, you see the details page for that job. The details page further shows the event timeline, DAG visualization, and all stages of the job. When you click on a specific job, you can see the detailed information of this job.
A stage in Spark represents a segment of the DAG computation that is completed locally. A stage breaks on an operation that requires a shuffling of data, which is why you'll see it named by that operation in the Spark UI.
In the Spark Directed acyclic graph or DAG, every edge directs from the earlier to later in sequence; thus, on calling of action, the previously created DAGs submits to the DAG Scheduler, which further splits a graph into stages of the task. Spark DAG is the strict generalization of the MapReduce model.
Input RDD
is cached and cached part is not recomputed.
This can be validated with a simple test:
import org.apache.spark.SparkContext
def f(sc: SparkContext) = {
val counter = sc.longAccumulator("counter")
val rdd = sc.parallelize(0 until 100).map(i => {
counter.add(1L)
(i%10, i)
}).persist
rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
counter.value
}
assert(f(spark.sparkContext) == 100)
Caching doesn't remove stages from DAG.
If data is cached corresponding stages can be marked as skipped but are still part of the DAG. Lineage can be truncated using checkpoints but it is not the same thing and it doesn't remove stages from visualization.
Input stages contain more than cached computations.
Spark stages group together operations which can be chained without performing shuffle.
While part of the input stage is cached it doesn't cover all the operations required to prepare shuffle files. This is why you don't see skipped tasks.
The rest (detachment) is just a limitation of the graph visualization.
If you repartition data first:
import org.apache.spark.HashPartitioner
val d = sc.parallelize(0 until 1000000)
.map(i => (i%100000, i))
.partitionBy(new HashPartitioner(20))
d.join(d.reduceByKey(_ + _)).collect
you'll get DAG you're most likely looking for:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With