What is the meaning of Project
node in Sparks execution plan?
I have a plan containing the following:
+- Project [dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, dm_country#population#country#839, population#17 AS dm_country#population#population#844]
+- Project [dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, country#12 AS dm_country#population#country#839, population#17]
+- Project [6a1ad864-235f-4761-9a6d-0ca2a2b40686#22 AS dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, country#12, population#17]
+- RepartitionByExpression [country#12], 1000
+- Union
:- Project [ind#7 AS 6a1ad864-235f-4761-9a6d-0ca2a2b40686#22, country#12, population#17]
: +- Project [ind#7, country#12, population#2 AS population#17]
: +- Project [ind#7, country#1 AS country#12, population#2]
: +- Project [ind#0 AS ind#7, country#1, population#2]
: +- Relation[ind#0,country#1,population#2] JDBCRelation(schema_dbadmin.t_350) [numPartitions=100]
+- LogicalRDD [ind#45, country#46, population#47]
The SQL Server execution plan (query plan) is a set of instructions that describes which process steps are performed while a query is executed by the database engine. The query plans are generated by the query optimizer and its essential goal is to generate the most efficient (optimum) and economical query plan.
In Spark, the query execution plan is the entry point to understanding how the spark query is executed. This is very important, especially while debugging or investigating the execution in heavy workloads, or when the job takes a long time to run.
Actual execution plans are generated after the Transact-SQL queries or batches execute. Because of this, an actual execution plan contains runtime information, such as actual number of rows, resource usage metrics and runtime warnings (if any). For more information, see Display an Actual Execution Plan.
NOTE: Since the plan uses RepartitionByExpression
node it must be a logical query plan.
Project node in a logical query plan stands for Project
unary logical operator and is created whenever you use some kind of projection explicitly or implicitly.
Quoting Wikipedia's Projection (relational algebra):
In practical terms, it can be roughly thought of as picking a subset of all available columns.
A Project
node can appear in a logical query plan explicitly for the following:
joinWith
, select
, unionByName
KeyValueGroupedDataset
operators, i.e. keys
, mapValues
SELECT
queriesA Project
node can also appear analysis and optimization phases.
In Spark SQL, the Dataset API gives the high-level operators, e.g. select
, filter
or groupBy
, that ultimately build a Catalyst logical plan of a structured query. In other words, this simple-looking Dataset.select operator is just to create a LogicalPlan
with Project
node.
val query = spark.range(4).select("id")
scala> println(query.queryExecution.logical)
'Project [unresolvedalias('id, None)]
+- Range (0, 4, step=1, splits=Some(8))
(You could have used query.explain(extended = true)
for the above but that would have given you all the 4 plans which may have hidden the point)
You could have a look at the code of Dataset.select
operator.
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
}
This simple-looking select
operator is a mere wrapper around Catalyst operators to build a Catalyst tree of logical operators that gives a logical plan.
NOTE What's nice about Spark SQL's Catalyst is that it uses this recursive LogicalPlan abstraction that represents an logical operator or a tree of logical operator.
NOTE The same applies to the good ol' SQL where after being parsed the SQL text is transformed to a AST of logical operators. See the example below.
Project
can come and go since projection is for the number of columns in the output and may or may not appear in your plans and queries.
You can use Spark SQL's Catalyst DSL (in org.apache.spark.sql.catalyst.dsl
package object) for constructing Catalyst data structures using Scala implicit conversions. That could be very useful if you are into Spark testing.
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
import org.apache.spark.sql.catalyst.dsl.plans._ // <-- gives table and select
import org.apache.spark.sql.catalyst.dsl.expressions.star
val plan = table("a").select(star())
scala> println(plan.numberedTreeString)
00 'Project [*]
01 +- 'UnresolvedRelation `a`
scala> spark.range(4).createOrReplaceTempView("nums")
scala> spark.sql("SHOW TABLES").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | nums| true|
+--------+---------+-----------+
scala> spark.sql("SELECT * FROM nums").explain
== Physical Plan ==
*Range (0, 4, step=1, splits=8)
scala> spark.sql("SELECT * FROM nums").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `nums`
== Analyzed Logical Plan ==
id: bigint
Project [id#40L]
+- SubqueryAlias nums
+- Range (0, 4, step=1, splits=Some(8))
== Optimized Logical Plan ==
Range (0, 4, step=1, splits=Some(8))
== Physical Plan ==
*Range (0, 4, step=1, splits=8)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With