I understand that UDF
s are a complete blackbox to Spark and no attempt will be made to optimize it. But will the usage of Column
type and its functions listed in: (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)
make the function "eligible" for Catalyst Optimizer
?.
For example, UDF
to create a new column by adding 1
to existing column
val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))
The same Function, using Column
type:
def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))
or
spark.sql("select *, col1 + 1 from df")
will there be any difference in performance between them?
It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .
Test results: RDD's outperformed DataFrames and SparkSQL for certain types of data processing. DataFrames and SparkSQL performed almost about the same, although with analysis involving aggregation and sorting SparkSQL had a slight advantage.
Native/SQL is generally the fastest as it has the most optimized code. Scala/Java does very well, narrowly beating SQL for the numeric UDF. The Scala DataSet API has some overhead however it's not large. Python is slow and while the vectorized UDF alleviates some of this there is still a large gap compared to Scala or ...
Over a simple in-memory set of 6 records, the 2nd and 3rd options yield relatively the same performance of ~70 miliseconds, which is much better than the first (using UDF - 0.7 seconds):
val addOne = udf( (num: Int) => num + 1 )
val res1 = df.withColumn("col2", addOne($"col1"))
res1.show()
//df.explain()
def addOne2(col1: Column) = col1.plus(1)
val res2 = df.withColumn("col2", addOne2($"col1"))
res2.show()
//res2.explain()
val res3 = spark.sql("select *, col1 + 1 from df")
res3.show()
Timeline: First two stages are for UDF option, next two for the second option, and last two for spark SQL:
In all three approaches, the shuffle writes was exactly the same (354.0 B) whereas the major difference in the duration was executor compute time when using UDF:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With