Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark UI DAG stage disconnected

I ran the following job in the spark-shell:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect

The Spark UI shows three stages. Stage 4 and 5 correspond to the computation of d, and stage 6 corresponds to the computation of the collect action. Since d is persisted, I would expect only two stages. However stage 5 is present not connected to any other stages.

Spark UI DAG

So tried running the same computation without using persist, and the DAG looks like identically, except without the green dots indicating the RDD has been persisted.

Spark UI DAG without persist

I would expect the output of stage 11 to be connect to the input of stage 12, but it is not.

Looking at the stage descriptions, the stages seem to indicate that d is being persisted, because stage 5 has input, but I am still confused as to why stage 5 even exists.

Spark UI stages

Spark UI stages without persist

like image 723
Justin Raymond Avatar asked Nov 16 '16 15:11

Justin Raymond


People also ask

Why do stages get skipped in Spark?

Stage Skipped means that data has been fetched from cache and re-execution of the given stage is not required. Basically the stage has been evaluated before, and the result is available without re-execution. It is consistent with your DAG which shows that the next stage requires shuffling (reduceByKey).

Where is the DAG in Spark UI?

When you click on a job on the summary page, you see the details page for that job. The details page further shows the event timeline, DAG visualization, and all stages of the job. When you click on a specific job, you can see the detailed information of this job.

What does stage mean in Pyspark?

A stage in Spark represents a segment of the DAG computation that is completed locally. A stage breaks on an operation that requires a shuffling of data, which is why you'll see it named by that operation in the Spark UI.

What is the working of DAG in Spark?

In the Spark Directed acyclic graph or DAG, every edge directs from the earlier to later in sequence; thus, on calling of action, the previously created DAGs submits to the DAG Scheduler, which further splits a graph into stages of the task. Spark DAG is the strict generalization of the MapReduce model.


1 Answers

  1. Input RDD is cached and cached part is not recomputed.

    This can be validated with a simple test:

    import org.apache.spark.SparkContext
    
    def f(sc: SparkContext) = {
      val counter = sc.longAccumulator("counter")
      val rdd = sc.parallelize(0 until 100).map(i => {
        counter.add(1L)
        (i%10, i)
      }).persist
      rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
      counter.value
    }
    
    assert(f(spark.sparkContext) == 100)
    
  2. Caching doesn't remove stages from DAG.

    If data is cached corresponding stages can be marked as skipped but are still part of the DAG. Lineage can be truncated using checkpoints but it is not the same thing and it doesn't remove stages from visualization.

  3. Input stages contain more than cached computations.

    Spark stages group together operations which can be chained without performing shuffle.

    While part of the input stage is cached it doesn't cover all the operations required to prepare shuffle files. This is why you don't see skipped tasks.

  4. The rest (detachment) is just a limitation of the graph visualization.

  5. If you repartition data first:

    import org.apache.spark.HashPartitioner
    
    val d = sc.parallelize(0 until 1000000)
      .map(i => (i%100000, i))
      .partitionBy(new HashPartitioner(20))
    
    d.join(d.reduceByKey(_ + _)).collect
    

    you'll get DAG you're most likely looking for:

    enter image description here

like image 117
zero323 Avatar answered Oct 11 '22 11:10

zero323