Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using a module with udf defined inside freezes pyspark job - explanation?

Here is the situation:

We have a module where we define some functions that return pyspark.sql.DataFrame (DF). To get those DF we use some pyspark.sql.functions.udf defined either in the same file or in helper modules. When we actually write job for pyspark to execute we only import functions from modules (we provide a .zip file to --py-files) and then just save the dataframe to hdfs.

Issue is that when we do this, the udf function freezes our job. The nasty fix we found was to define udf functions inside the job and provide them to imported functions from our module. The other fix I found here is to define a class:

from pyspark.sql.functions import udf


class Udf(object):
    def __init__(s, func, spark_type):
        s.func, s.spark_type = func, spark_type

    def __call__(s, *args):
        return udf(s.func, s.spark_type)(*args)

Then use this to define my udf in module. This works!

Can anybody explain why we have this problem in the first place? And why this fix (the last one with the class definition) works?

Additional info: PySpark 2.1.0. Deploying job on yarn in cluster mode.

Thanks!

like image 875
Vaidas Armonas Avatar asked Jul 14 '17 05:07

Vaidas Armonas


People also ask

How does UDF Pyspark work?

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

How does UDF prevent Spark?

To avoid the use of this UDF, we will need to refer to a native function called filter. This function has not been available in the pyspark. sql. functions package until version 3.1, so let's see examples of how to do it in Spark 2.

Why is UDF Pyspark slow?

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

Can Pyspark UDF return multiple columns?

The short answer is: No. Using a PySpark UDF requires Spark to serialize the Scala objects, run a Python process, deserialize the data in Python, run the function, serialize the results, and deserialize them in Scala. This causes a considerable performance penalty, so I recommend to avoid using UDFs in PySpark.


1 Answers

The accepted answer to the link you posted above, says, "My work around is to avoid creating the UDF until Spark is running and hence there is an active SparkContext." Looks like your issue is with serializing the UDF.

Make sure the UDF functions in your helper classes are static methods or global functions. And inside the public functions that you import elsewhere, you can define the udf.

class Helperclass(object):
  @staticmethod
  def my_udf_todo(...):
     ...

  def public_function_that_is_imported_elsewhere(...):
     todo_udf = udf(Helperclass.my_udf_todo, RETURN_SCHEMA)
     ...
like image 108
greenie Avatar answered Oct 02 '22 06:10

greenie