Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Many skipped stages for Pregel in Spark UI

I try to run connected components on logNormalGraph.

val graph: Graph[Long, Int] = GraphGenerators.
    logNormalGraph(context.spark, numEParts = 10, numVertices = 1000000,
        mu = 0.01, sigma = 0.01)

val minGraph = graph.connectedComponents()

And in the spark UI for every next job I can see constantly growing number of skipped stages

1 - 4/4 (12 skipped)
2 - 4/4 (23 skipped)
50 - 4/4 (4079 skipped)

Why there are so many skipped stages when I run something on Pregel and why this number is growing so fast (non-linearly)?

like image 625
Alexander Ponomarev Avatar asked Apr 12 '16 13:04

Alexander Ponomarev

People also ask

Why are some stages skipped in spark?

Typically it means that data has been fetched from cache and there was no need to re-execute given stage. It is consistent with your DAG which shows that the next stage requires shuffling ( reduceByKey ).

What is Dag in spark stackoverflow?

DAG or Directed Acyclic Graph is defined as a set of the Vertices and the edges where the vertices represent Resilient distributed systems(RDD), and edges represent the Operation which is to be applied on RDD.

1 Answers

Step by step. connectedComponents function is implemented using Pregel API. Ignoring algorithm specific details it iteratively:

  • joinVertices caching the output
  • mapReduceTriplets over messages

First lets create dummy sendMsg:

import org.apache.spark.graphx._

def sendMsg(edge: EdgeTriplet[VertexId, Int]): 
    Iterator[(VertexId, VertexId)] = {
  Iterator((edge.dstId, edge.srcAttr))


val vprog =  (id: Long, attr: Long, msg: Long) => math.min(attr, msg)

and megeMsg:

val mergeMsg = (a: Long, b: Long) => math.min(a, b)

Next we can initialize example graph:

import org.apache.spark.graphx.util.GraphGenerators

val graph = GraphGenerators.logNormalGraph(
   sc, numEParts = 10, numVertices = 100,  mu = 0.01, sigma = 0.01)
  .mapVertices { case (vid, _) => vid }

val g0 = graph
  .mapVertices((vid, vdata) => vprog(vid, vdata, Long.MaxValue))

and messages:

val messages0 = g0.mapReduceTriplets(sendMsg, mergeMsg).cache()

Since GraphXUtils are private we have to use Graph methods directly.

When you take a look at the DAG generated by


you'll already see some skipped stages:

enter image description here

After executing the first iteration

val g1 = g0.joinVertices(messages0)(vprog).cache()
val messages1 = g1.mapReduceTriplets(sendMsg, mergeMsg).cache()

graph will look more or less like this:

enter image description here

If we continue:

val g2 = g1.joinVertices(messages1)(vprog).cache()
val messages2 = g2.mapReduceTriplets(sendMsg, mergeMsg).cache()

we get following DAG:

enter image description here

So what happened here:

  • we execute iterative algorithm which takes a dependency on the same data twice, once for join and once for message aggregation. This leads to increasing number of stages on which g depends in each iteration
  • since data is intensively cached (explicitly as you can see in the code, explicitly by persisting shuffle files) and checkpointed (I could be wrong here, but checkpoints are typically marked as green dots) each stage has to be computed only once, even if multiple downstream stages depend on it.
  • after data is initialized (g0, messages0) only the the latest stages are computed from scratch.
  • if you take a closer look at DAG you'll see that there are quite complex dependencies which should account for remaining discrepancies between relatively slow growth of DAG and number of skipped stages.

The first property explains growing number of stages, the second one the fact that stages are skipped.

like image 111
zero323 Avatar answered Sep 20 '22 17:09
