Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Grouped Map Pandas UDFs with arguments

I want to use data.groupby.apply() to apply a function to each row of my Pyspark Dataframe per group.

I used The Grouped Map Pandas UDFs. However I can't figure out how to add another argument to my function.

I tried using the argument as a global variable but the function doesn't recognize it (my argument is a pyspark dataframe)
I also tried the solutions proposed in this question (for pandas dataframe) Use Pandas groupby() + apply() with arguments

      @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
        def function(key,data, interval):
            interval_df=interval.filter(interval["var"]==key).toPandas()
            for value in interval_df:
                  #Apply some operations

        return Data.groupBy("msn").apply(calc_diff, ('arg1'))
       

Or

 @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
        def function(key,data, interval):
            interval_df=interval.filter(interval["var"]==key).toPandas()
            for value in interval_df:
                  #Apply some operations

        return Data.groupBy("msn").apply(lambda x: calc_diff(x,'arg1'))

But I get the error :

ValueError: Invalid function: pandas_udfs with function type GROUPED_MAP must take either one argument (data) or two arguments (key, data).

Could anyone help me with the above issue.

Thanks

like image 537
Yasmine Avatar asked Apr 30 '19 08:04

Yasmine


3 Answers

I like @hwrd's idea, but instead, would make it a generator pattern to make it easier to integrate like in @Feng's example:

def function_generator(key):
    @pandas_udf(schema,PandasUDFType.GROUPED_MAP)
    def function(interval):
        interval_df=interval.filter(interval["var"]==key).toPandas()
        for value in interval_df:
              #Apply some operations
    return function

calc_diff = function_generator('arg1')
output = Data.groupBy("msn").apply(calc_diff)

like image 163
sifta Avatar answered Oct 08 '22 08:10

sifta


You can create the pandas udf inside your function, so that the function arguments are known to it a the time of its creation. (Or you can import functools and use partial function evaluation to do the same thing.) Here is the example from the PySpark documentation, modified to pass in some parameters:

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


def my_function(df, by="id", column="v", value=1.0):
    schema = "{} long, {} double".format(by, column)

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def subtract_value(pdf):
        # pdf is a pandas.DataFrame
        v = pdf[column]
        g = pdf[by]
        return pdf.assign(v = v - g * value)

    return df.groupby(by).apply(subtract_value)

my_function(df, by="id", column="v", value=2.0).show()
like image 30
hwrd Avatar answered Oct 08 '22 10:10

hwrd


I think you could do something like this

def myfun(data, key, interval):
    #Apply some operations
    return something

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def myfun_udf(data):
    return myfun(data=data, key=mykey, interval=myinterval)


mykey=1
myinterval=2

Data.groupBy("msn").apply(myfun_udf)

like image 2
Feng Avatar answered Oct 08 '22 10:10

Feng