Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to know the number of Spark jobs and stages in (broadcast) join query?

I use Spark 2.1.2.

I am trying to understand various spark UI tab displays vis-a-vis as a job runs. I use spark-shell --master local and doing the following join query:

val df = Seq(
  (55, "Canada", -1, "", 0),
  (77, "Ontario", 55, "/55", 1),
  (100, "Toronto", 77, "/55/77", 2),
  (104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")

val dfWithPar = df.as("df1").
  join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
  select($"df1.*", $"df2.name" as "parentName")

dfWithPar.show

This is the physical query plan:

== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- LocalTableScan [Id#24, name#25]

I've got two questions about the query execution.

  1. Why are there two jobs for the query?

    Spark job view

  2. Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?

    Stage view of Stage 1 which is exactly same as Stage 0

like image 486
sujit Avatar asked Mar 20 '18 13:03

sujit


People also ask

How do you know how many stages of Spark?

There are mainly two stages associated with the Spark frameworks such as, ShuffleMapStage and ResultStage. The Shuffle MapStage is the intermediate phase for the tasks which prepares data for subsequent stages, whereas resultStage is a final step to the spark function for the particular set of tasks in the spark job.

How many stages are there in Spark job?

Basically, there are two types of stages in spark- ShuffleMapstage and ResultStage.

How do I track my Spark job?

Click Analytics > Spark Analytics > Open the Spark Application Monitoring Page. Click Monitor > Workloads, and then click the Spark tab. This page displays the user names of the clusters that you are authorized to monitor and the number of applications that are currently running in each cluster.

What is the threshold limit for broadcast join in Spark?

Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql. autoBroadcastJoinThreshold which is by default 10MB.


1 Answers

I use Spark 2.3.0 to answer your question (2.3.1-SNAPSHOT actually) since it is the latest and greatest at the time of this writing. That changes very little about query execution (if anything important) as the physical query plans in your 2.1.2 and my 2.3.0 are exactly the same (except the per-query codegen stage ID in round brackets).


After dfWithPar.show the structured query (that you built using Spark SQL's Dataset API for Scala) is optimized to the following physical query plan (I'm including it in my answer for better comprehension).

scala> dfWithPar.explain
== Physical Plan ==
*(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
+- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
   :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- LocalTableScan [Id#23, name#24]

Number of Spark Jobs

Why are there two jobs for the query?

I'd say there are even three Spark jobs.

Spark jobs of the broadcast join query in web UI

tl;dr One Spark job is for BroadcastHashJoinExec physical operator whereas the other two are for Dataset.show.

In order to understand the query execution and the number of Spark jobs of a structured query, it is important to understand the difference between structured queries (described using Dataset API) and RDD API.

Spark SQL's Datasets and Spark Core's RDDs both describe distributed computations in Spark. RDDs are the Spark "assembler" language (akin to the JVM bytecode) while Datasets are higher-level descriptions of structured queries using SQL-like language (akin to JVM languages like Scala or Java as compared to the JVM bytecode I used earlier).

What's important is that structured queries using Dataset API eventually end up as a RDD-based distributed computation (which could be compared to how the Java or Scala compilers transform the higher-level languages to the JVM bytecode).

Dataset API is an abstraction over RDD API and when you call an action on a DataFrame or Dataset that action transforms them to RDDs.

With that, you should not be surprised that Dataset.show will in the end call RDD action that in turn will run zero, one or many Spark jobs.

Dataset.show (with numRows equals 20 by default) in the end calls showString that take(numRows + 1) to get an Array[Row].

val takeResult = newDf.select(castCols: _*).take(numRows + 1)

In other words, dfWithPar.show() is equivalent of dfWithPar.take(21) which in turn is equivalent to dfWithPar.head(21) as far as the number of Spark jobs are concerned.

You can see them and their number of jobs in the SQL tab. They should all be equal.

SQL tab in web UI

show or take or head all lead to collectFromPlan that triggers the Spark jobs (by calling executeCollect).

You should be certain that to answer your question about the number of jobs is to know how all the physical operators in the query work. You simply have to know their behaviour at runtime and whether they trigger Spark jobs at all.

enter image description here

BroadcastHashJoin and BroadcastExchangeExec Physical Operators

BroadcastHashJoinExec binary physical operator is used when the right side of a join can be broadcast (which is exactly spark.sql.autoBroadcastJoinThreshold that is 10M by default).

BroadcastExchangeExec unary physical operator is used to broadcast rows (of a relation) to worker nodes (to support BroadcastHashJoinExec).

When BroadcastHashJoinExec is executed (to generate a RDD[InternalRow]), it creates a broadcast variable that in turn executes BroadcastExchangeExec (on a separate thread).

That's why the run at ThreadPoolExecutor.java:1149 Spark job 0 was run.

You could see the single Spark job 0 ran if you executed the following:

// Just a single Spark job for the broadcast variable
val r = dfWithPar.rdd

That requires that the structured query is executed to produce a RDD that is then the target of the action to give the final result.

enter image description here

You would not have had the Spark job if you had not ended up with a broadcast join query.

RDD.take Operator

What I missed the very first moment when I answered the question was that the Dataset operators, i.e. show, take and head, will in the end lead to RDD.take.

take(num: Int): Array[T] Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.

Please note when take says "It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit." That's the key to understand the number of Spark jobs in your broadcast join query.

Every iteration (in the description above) is a separate Spark job starting with the very first partition and 4 times as many every following iteration:

// RDD.take
def take(num: Int): Array[T] = withScope {
  ...
  while (buf.size < num && partsScanned < totalParts) {
    ...
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
    ...
  }
}

Have a look at the following RDD.take with 21 rows.

// The other two Spark jobs
r.take(21)

You will get 2 Spark jobs as in your query.

enter image description here

Guess how many Spark jobs you will have if you executed dfWithPar.show(1).

Why Are Stages Identical?

Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?

That's easy to answer since both Spark jobs are from RDD.take(20).

The first Spark job is to scan the first partition and since it had not enough rows led to another Spark job to scan more partitions.

like image 143
Jacek Laskowski Avatar answered Oct 01 '22 11:10

Jacek Laskowski