Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: UDF executed many times

I have a dataframe with the following code:

def test(lat: Double, lon: Double) = {
  println(s"testing ${lat / lon}")
  Map("one" -> "one", "two" -> "two")
}

val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

Now checking the logs, I found out that for each row the UDF is executed 3 times. If I add the "test3" from a "test.three" column then the UDF is executed once more.

Can someone explain me why?

Can this be avoid properly (without caching the dataframe after "test" is added, even if this works)?

like image 683
Rolintocour Avatar asked Nov 04 '19 15:11

Rolintocour


People also ask

Why UDF is not good in spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

Why is UDFs PySpark slow?

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

Can a UDF return multiple columns?

1 Answer. Creating multiple top level columns from a single UDF call, isn't possible but you can create a new struct.


1 Answers

If you want to avoid multiple calls to a udf (which is useful especially if the udf is a bottleneck in your job) you can do it as follows:

val testUDF = udf(test _).asNondeterministic()

Basically you tell Spark that your function is not deterministic and now Spark makes sure it is called only once because it is not safe to call it multiple times (each call could possibly return different result).

Also be aware that this trick is not for free, by doing this you are putting some constraints on the optimizer, one side effect of this is for example that Spark optimizer does not push filters through expressions that are not deterministic so you become responsible for optimal position of the filters in your query.

like image 117
David Vrba Avatar answered Sep 23 '22 19:09

David Vrba