I want to use data.groupby.apply()
to apply a function to each row of my Pyspark Dataframe per group.
I used The Grouped Map Pandas UDFs. However I can't figure out how to add another argument to my function.
I tried using the argument as a global variable but the function doesn't recognize it (my argument is a pyspark dataframe)
I also tried the solutions proposed in this question (for pandas dataframe) Use Pandas groupby() + apply() with arguments
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(calc_diff, ('arg1'))
Or
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(key,data, interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return Data.groupBy("msn").apply(lambda x: calc_diff(x,'arg1'))
But I get the error :
ValueError: Invalid function: pandas_udfs with function type GROUPED_MAP must take either one argument (data) or two arguments (key, data).
Could anyone help me with the above issue.
Thanks
I like @hwrd's idea, but instead, would make it a generator pattern to make it easier to integrate like in @Feng's example:
def function_generator(key):
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def function(interval):
interval_df=interval.filter(interval["var"]==key).toPandas()
for value in interval_df:
#Apply some operations
return function
calc_diff = function_generator('arg1')
output = Data.groupBy("msn").apply(calc_diff)
You can create the pandas udf inside your function, so that the function arguments are known to it a the time of its creation. (Or you can import functools and use partial function evaluation to do the same thing.) Here is the example from the PySpark documentation, modified to pass in some parameters:
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def my_function(df, by="id", column="v", value=1.0):
schema = "{} long, {} double".format(by, column)
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def subtract_value(pdf):
# pdf is a pandas.DataFrame
v = pdf[column]
g = pdf[by]
return pdf.assign(v = v - g * value)
return df.groupby(by).apply(subtract_value)
my_function(df, by="id", column="v", value=2.0).show()
I think you could do something like this
def myfun(data, key, interval):
#Apply some operations
return something
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def myfun_udf(data):
return myfun(data=data, key=mykey, interval=myinterval)
mykey=1
myinterval=2
Data.groupBy("msn").apply(myfun_udf)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With