Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UDF's vs Spark sql vs column expressions performance optimization

I understand that UDFs are a complete blackbox to Spark and no attempt will be made to optimize it. But will the usage of Column type and its functions listed in: (https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)
make the function "eligible" for Catalyst Optimizer?.

For example, UDF to create a new column by adding 1 to existing column

val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))

The same Function, using Column type:

def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))

or

spark.sql("select *, col1 + 1 from df")

will there be any difference in performance between them?

like image 623
vdep Avatar asked Aug 03 '17 17:08

vdep


People also ask

Why we should not use UDF in Spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Which is better Spark SQL or Spark Dataframe?

Test results: RDD's outperformed DataFrames and SparkSQL for certain types of data processing. DataFrames and SparkSQL performed almost about the same, although with analysis involving aggregation and sorting SparkSQL had a slight advantage.

Is Spark SQL faster than Spark Scala?

Native/SQL is generally the fastest as it has the most optimized code. Scala/Java does very well, narrowly beating SQL for the numeric UDF. The Scala DataSet API has some overhead however it's not large. Python is slow and while the vectorized UDF alleviates some of this there is still a large gap compared to Scala or ...


1 Answers

Over a simple in-memory set of 6 records, the 2nd and 3rd options yield relatively the same performance of ~70 miliseconds, which is much better than the first (using UDF - 0.7 seconds):

val addOne = udf( (num: Int) => num + 1 )
val res1 = df.withColumn("col2", addOne($"col1"))
res1.show()
//df.explain()

def addOne2(col1: Column) = col1.plus(1)
val res2 = df.withColumn("col2", addOne2($"col1"))
res2.show()
//res2.explain()

val res3 = spark.sql("select *, col1 + 1 from df")
res3.show()

Timeline: First two stages are for UDF option, next two for the second option, and last two for spark SQL: Timeline - first two stages are for UDF, next two for the second option, and last two for spark sql

In all three approaches, the shuffle writes was exactly the same (354.0 B) whereas the major difference in the duration was executor compute time when using UDF: Executor compute time when using UDF

like image 122
Yosi Dahari Avatar answered Oct 19 '22 23:10

Yosi Dahari