Here is the situation:
We have a module where we define some functions that return pyspark.sql.DataFrame
(DF). To get those DF we use some pyspark.sql.functions.udf
defined either in the same file or in helper modules. When we actually write job for pyspark to execute we only import functions from modules (we provide a .zip
file to --py-files
) and then just save the dataframe to hdfs.
Issue is that when we do this, the udf
function freezes our job. The nasty fix we found was to define udf
functions inside the job and provide them to imported functions from our module.
The other fix I found here is to define a class:
from pyspark.sql.functions import udf
class Udf(object):
def __init__(s, func, spark_type):
s.func, s.spark_type = func, spark_type
def __call__(s, *args):
return udf(s.func, s.spark_type)(*args)
Then use this to define my udf
in module. This works!
Can anybody explain why we have this problem in the first place? And why this fix (the last one with the class definition) works?
Additional info: PySpark 2.1.0. Deploying job on yarn in cluster mode.
Thanks!
PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.
To avoid the use of this UDF, we will need to refer to a native function called filter. This function has not been available in the pyspark. sql. functions package until version 3.1, so let's see examples of how to do it in Spark 2.
The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.
The accepted answer to the link you posted above, says, "My work around is to avoid creating the UDF until Spark is running and hence there is an active SparkContext." Looks like your issue is with serializing the UDF.
Make sure the UDF functions in your helper classes are static methods or global functions. And inside the public functions that you import elsewhere, you can define the udf.
class Helperclass(object):
@staticmethod
def my_udf_todo(...):
...
def public_function_that_is_imported_elsewhere(...):
todo_udf = udf(Helperclass.my_udf_todo, RETURN_SCHEMA)
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With