I use Spark 2.1.2.
I am trying to understand various spark UI tab displays vis-a-vis as a job runs. I use spark-shell --master local
and doing the following join
query:
val df = Seq(
(55, "Canada", -1, "", 0),
(77, "Ontario", 55, "/55", 1),
(100, "Toronto", 77, "/55/77", 2),
(104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")
val dfWithPar = df.as("df1").
join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
select($"df1.*", $"df2.name" as "parentName")
dfWithPar.show
This is the physical query plan:
== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
:- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [Id#24, name#25]
I've got two questions about the query execution.
Why are there two jobs for the query?
Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?
There are mainly two stages associated with the Spark frameworks such as, ShuffleMapStage and ResultStage. The Shuffle MapStage is the intermediate phase for the tasks which prepares data for subsequent stages, whereas resultStage is a final step to the spark function for the particular set of tasks in the spark job.
Basically, there are two types of stages in spark- ShuffleMapstage and ResultStage.
Click Analytics > Spark Analytics > Open the Spark Application Monitoring Page. Click Monitor > Workloads, and then click the Spark tab. This page displays the user names of the clusters that you are authorized to monitor and the number of applications that are currently running in each cluster.
Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql. autoBroadcastJoinThreshold which is by default 10MB.
I use Spark 2.3.0 to answer your question (2.3.1-SNAPSHOT actually) since it is the latest and greatest at the time of this writing. That changes very little about query execution (if anything important) as the physical query plans in your 2.1.2 and my 2.3.0 are exactly the same (except the per-query codegen stage ID in round brackets).
After dfWithPar.show
the structured query (that you built using Spark SQL's Dataset API for Scala) is optimized to the following physical query plan (I'm including it in my answer for better comprehension).
scala> dfWithPar.explain
== Physical Plan ==
*(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
+- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
:- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [Id#23, name#24]
Why are there two jobs for the query?
I'd say there are even three Spark jobs.
tl;dr One Spark job is for BroadcastHashJoinExec
physical operator whereas the other two are for Dataset.show
.
In order to understand the query execution and the number of Spark jobs of a structured query, it is important to understand the difference between structured queries (described using Dataset API) and RDD API.
Spark SQL's Datasets and Spark Core's RDDs both describe distributed computations in Spark. RDDs are the Spark "assembler" language (akin to the JVM bytecode) while Datasets are higher-level descriptions of structured queries using SQL-like language (akin to JVM languages like Scala or Java as compared to the JVM bytecode I used earlier).
What's important is that structured queries using Dataset API eventually end up as a RDD-based distributed computation (which could be compared to how the Java or Scala compilers transform the higher-level languages to the JVM bytecode).
Dataset API is an abstraction over RDD API and when you call an action on a DataFrame or Dataset that action transforms them to RDDs.
With that, you should not be surprised that Dataset.show will in the end call RDD action that in turn will run zero, one or many Spark jobs.
Dataset.show
(with numRows
equals 20 by default) in the end calls showString that take(numRows + 1) to get an Array[Row]
.
val takeResult = newDf.select(castCols: _*).take(numRows + 1)
In other words, dfWithPar.show()
is equivalent of dfWithPar.take(21)
which in turn is equivalent to dfWithPar.head(21)
as far as the number of Spark jobs are concerned.
You can see them and their number of jobs in the SQL tab. They should all be equal.
show
or take
or head
all lead to collectFromPlan that triggers the Spark jobs (by calling executeCollect).
You should be certain that to answer your question about the number of jobs is to know how all the physical operators in the query work. You simply have to know their behaviour at runtime and whether they trigger Spark jobs at all.
BroadcastHashJoinExec
binary physical operator is used when the right side of a join can be broadcast (which is exactly spark.sql.autoBroadcastJoinThreshold
that is 10M
by default).
BroadcastExchangeExec
unary physical operator is used to broadcast rows (of a relation) to worker nodes (to support BroadcastHashJoinExec
).
When BroadcastHashJoinExec
is executed (to generate a RDD[InternalRow]
), it creates a broadcast variable that in turn executes BroadcastExchangeExec
(on a separate thread).
That's why the run at ThreadPoolExecutor.java:1149 Spark job 0 was run.
You could see the single Spark job 0 ran if you executed the following:
// Just a single Spark job for the broadcast variable
val r = dfWithPar.rdd
That requires that the structured query is executed to produce a RDD that is then the target of the action to give the final result.
You would not have had the Spark job if you had not ended up with a broadcast join query.
What I missed the very first moment when I answered the question was that the Dataset operators, i.e. show
, take
and head
, will in the end lead to RDD.take
.
take(num: Int): Array[T] Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
Please note when take
says "It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit." That's the key to understand the number of Spark jobs in your broadcast join query.
Every iteration (in the description above) is a separate Spark job starting with the very first partition and 4 times as many every following iteration:
// RDD.take
def take(num: Int): Array[T] = withScope {
...
while (buf.size < num && partsScanned < totalParts) {
...
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
...
}
}
Have a look at the following RDD.take
with 21 rows.
// The other two Spark jobs
r.take(21)
You will get 2 Spark jobs as in your query.
Guess how many Spark jobs you will have if you executed dfWithPar.show(1)
.
Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?
That's easy to answer since both Spark jobs are from RDD.take(20)
.
The first Spark job is to scan the first partition and since it had not enough rows led to another Spark job to scan more partitions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With