Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does sortBy transformation trigger a Spark job?

As per Spark documentation only RDD actions can trigger a Spark job and the transformations are lazily evaluated when an action is called on it.

I see the sortBy transformation function is applied immediately and it is shown as a job trigger in the SparkUI. Why?

like image 779
Prabu Soundar Rajan Avatar asked Dec 30 '16 22:12

Prabu Soundar Rajan


People also ask

Is sortBy an action in Spark?

As per Spark documentation only RDD actions can trigger a Spark job and the transformations are lazily evaluated when an action is called on it. I see the sortBy transformation function is applied immediately and it is shown as a job trigger in the SparkUI.

Is sortBy action or transformation?

> sortByKey() is a transformation. > It returns an RDD sorted by Key. They will work with any key type K that has an implicit Ordering[K] in scope.

What is sortBy in Spark?

In Spark, the sortByKey function maintains the order of elements. It receives key-value pairs (K, V) as an input, sorts the elements in ascending or descending order and generates a dataset in an order.

Why do we perform transformations in Spark?

Transformations create RDDs from each other, but when we want to work with the actual dataset, at that point action is performed. When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values.


2 Answers

sortBy is implemented using sortByKey which depends on a RangePartitioner (JVM) or partitioning function (Python). When you call sortBy / sortByKey partitioner (partitioning function) is initialized eagerly and samples input RDD to compute partition boundaries. Job you see corresponds to this process.

Actual sorting is performed only if you execute an action on the newly created RDD or its descendants.

like image 129
zero323 Avatar answered Sep 30 '22 03:09

zero323


As per Spark documentation only the action triggers a job in Spark, the transformations are lazily evaluated when an action is called on it.

In general you're right, but as you've just experienced, there are few exceptions and sortBy is among them (with zipWithIndex).

As a matter of fact, it was reported in Spark's JIRA and closed with Won't Fix resolution. See SPARK-1021 sortByKey() launches a cluster job when it shouldn't.

You can see the job running with DAGScheduler logging enabled (and later in web UI):

scala> sc.parallelize(0 to 8).sortBy(identity)
INFO DAGScheduler: Got job 1 (sortBy at <console>:25) with 8 output partitions
INFO DAGScheduler: Final stage: ResultStage 1 (sortBy at <console>:25)
INFO DAGScheduler: Parents of final stage: List()
INFO DAGScheduler: Missing parents: List()
DEBUG DAGScheduler: submitStage(ResultStage 1)
DEBUG DAGScheduler: missing: List()
INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25), which has no missing parents
DEBUG DAGScheduler: submitMissingTasks(ResultStage 1)
INFO DAGScheduler: Submitting 8 missing tasks from ResultStage 1 (MapPartitionsRDD[4] at sortBy at <console>:25)
DEBUG DAGScheduler: New pending partitions: Set(0, 1, 5, 2, 6, 3, 7, 4)
INFO DAGScheduler: ResultStage 1 (sortBy at <console>:25) finished in 0.013 s
DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
INFO DAGScheduler: Job 1 finished: sortBy at <console>:25, took 0.019755 s
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at sortBy at <console>:25
like image 31
Jacek Laskowski Avatar answered Sep 30 '22 02:09

Jacek Laskowski