Experiment
I tried the following snippet on Spark 1.6.1
.
val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")
The Physical Plan
is:
== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(cnt#59L ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
+- TungstenExchange hashpartitioning(dpHour#38,200), None
+- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
+- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder
For this query, I got two Jobs: Job 9
and Job 10
For Job 9
, the DAG
is:
For Job 10
, the DAG
is:
Observations
jobs
for one query. Stage-16
(marked as Stage-14
in Job 9
) is skipped in Job 10
.Stage-15
's last RDD[48]
, is same as Stage-17
's last RDD[49]
. How? I saw in the logs that after Stage-15
execution, the RDD[48]
is registered as RDD[49]
Stage-17
is shown in the driver-logs
but never got executed at Executors
. On driver-logs
the task-execution is shown, but when I looked at Yarn
container's logs, there was no evidence of receiving any task
from Stage-17
. Logs supporting these observations (only driver-logs
, I lost executor
logs due to later crash). It is seen that before Stage-17
starts, RDD[49]
is registered:
16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)
Questions
Jobs
? What is the intention here by breaking a DAG
into two jobs
?Job 10
's DAG
looks complete for the query execution. Is there anything specific Job 9
is doing?Stage-17
is not Skipped? It looks like dummy tasks
are created, do they have any purpose.Later, I tried another rather simpler query. Unexpectedly, it was creating 3 Jobs
.
sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")
When you are using the high-level dataframe/dataset APIs, you leave it up to Spark to determine the execution plan, including the job/stage chunking. These depend on many factors such as execution parallelism, cached/persisted data structures, etc. In future versions of Spark, as the optimizer sophistication increases, you may see even more jobs per query as, for example, some data sources are sampled to parameterize cost-based execution optimization.
For example, I have frequently, but not always, seen writing generate separate jobs from processing that involves shuffles.
Bottom line, if you are using the high-level APIs, unless you have to do extremely detailed optimization with huge data volumes, it rarely pays to dig into the specific chunking. Job startup costs are extremely low compared to processing/output.
If, on the other hand, you are curious about the Spark internals, read the optimizer code and engage on the Spark developer mailing list.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With