Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark User-Defined_functions inside of a class

I am trying to create a Spark-UDF inside of a python class. Meaning, one of the methods in a class is the UDF. I am getting an error named " PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects "

Environment : Azure Databricks . (DBR version 6.1 Beta) Code execution : In the built in Notebook. Python version : 3.5 Spark version : 2.4.4

I have tried defining the UDF outside of the class in a separate cell, and the UDF works. I do not want to write code like that, I need to follow OOP principles and would like to keep it structured. I have tried everything on Google, did not help. In fact I did not even get the information about the error I am getting. " PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects "

class phases():
  def __init__(self, each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):
    print("Inside the constructor of Class phases ")

    #I need the below 2 variables to be used in my UDF, so i am trying to put 
    them in a class
    self.each_mp_pair_phases_df = each_mp_pair_df_as_arg
    self.unique_mp_pair_phases_df = unique_mp_pair_df_as_arg

  #This is the UDF. 
  def phases_commence(self,each_row):
    print(a)
    return 1

  #This is the function that registers the UDF, 
  def initiate_the_phases_on_the_major_track_segment(self):
    print("Inside the 'initiate_the_phases_on_the_major_track_segment()'")

    #registering the UDF
    self.phases_udf = udf(self.phases_commence,LongType())
    new_df = self.each_mp_pair_phases_df.withColumn("status", self.phases_udf((struct([self.each_mp_pair_phases_df[x] for x in self.each_mp_pair_phases_df.columns]))))
    display(new_df)
#This is a method in a different notebook that creates an object for the above shown class and calls the methods that registers the UDF.
def getting_ready_for_the_phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg):

  phase_obj = phases(each_mp_pair_df_as_arg, unique_mp_pair_df_as_arg)
  phase_obj.initiate_the_phases_on_the_major_track_segment()

The error message is: PicklingError: Could not serialize object: TypeError: can't pickle _MovedItems objects

like image 338
Chinivar Basu Avatar asked Oct 16 '19 15:10

Chinivar Basu


People also ask

What is UDF function in Pyspark?

PySpark UDF is a User Defined Function that is used to create a reusable function in Spark. Once UDF created, that can be re-used on multiple DataFrames and SQL (after registering). The default type of the udf() is StringType.

How do you use pandas UDF in Pyspark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

Why is UDF slow in spark?

The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.


1 Answers

Your function needs to be static in order to define it as an udf. I was looking for some documentation to provide a good explanation, but couldn't really find it.

Basically (maybe not 100% accurate; corrections are appreciated) when you define an udf it gets pickled and copied to each executor automatically, but you can't pickle a single method of a class which is not defined at the top level (the class is part of the top level but not its methods). Have a look at this post for workarounds other than static methods.

import pyspark.sql.functions as F
import pyspark.sql.types as T


class Phases():
  def __init__(self, df1):
    print("Inside the constructor of Class phases ")

    self.df1 = df1
    self.phases_udf = F.udf(Phases.phases_commence,T.IntegerType())

  #This is the UDF. 
  @staticmethod
  def phases_commence(age):
    age = age +3
    return age

  #This is the function that registers the UDF, 
  def doSomething(self):
    print("Inside the doSomething")
    self.df1 = self.df1.withColumn('AgeP2', self.phases_udf(F.col('Age')))

l =[(1,   10   ,  'F')
,(2 ,   2   ,  'M')
,(2 ,  10  ,   'F')
,(2 ,  3  ,    'F')
,(3 ,  10,     'M')]

columns = ['id',  'Age',  'Gender']

df=spark.createDataFrame(l, columns)

bla = Phases(df)
bla.doSomething()
bla.df1.show()

Output:

Inside the constructor of Class phases 
Inside the 'initiate_the_phases_on_the_major_track_segment()' 
+---+---+------+-----+ 
| id|Age|Gender|AgeP2| 
+---+---+------+-----+ 
|  1| 10|     F|   13| 
|  2|  2|     M|    5| 
|  2| 10|     F|   13| 
|  2|  3|     F|    6| 
|  3| 10|     M|   13| 
+---+---+------+-----+
like image 60
cronoik Avatar answered Oct 13 '22 11:10

cronoik