I have written a UDF. It is very slow. I would like to replace it with a pandas_udf to take advantage of vectorization.
The actual udf is a bit more complicated, but I have created a simplified toy version of it.
My question: is it possible to replace the UDF in my toy example with a pandas_udf that would take advantage of vectorization? If not, why not?
P.S: I know I could achieve the same effect without a UDF. That is because I simplified the example, but that is not my goal.
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
'Number': [[2, 1, 1], [3, 1, 1, 2]],
})
)
# The UDF I hope to replace with a pandas_udf
@f.udf(ArrayType(StringType()))
def array_func(le, nr):
res=[]
for i in range(len(nr)):
if nr[i]==1:
res.append(le[i])
else:
res.append('Nope')
return res
# Applying the udf
df = df.withColumn('udf', array_func('Letter','Number'))
df.show()
How about this?
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
#Example data
df = spark.createDataFrame(pd.DataFrame({ 'Letter': [['A', 'A', 'C'], ['A', 'C', 'A', 'D']],
'Number': [[2, 1, 1], [3, 1, 1, 2]],
})
)
df.show()
# Add a dummy column so you can use groupby
df = df.withColumn('id', F.lit(1))
schm = StructType(df.schema.fields + [StructField('udf', ArrayType(StringType()), True)])
@pandas_udf(schm, PandasUDFType.GROUPED_MAP)
def array_udf(pdf):
res=[]
for ls, ns in zip(pdf['Letter'], pdf['Number']):
r = [l if n == 1 else 'Nope' for l, n in zip(ls, ns)]
res.append(r)
pdf['udf'] = res
return pdf
df = df.groupby('id').apply(array_udf).drop('id')
df.show()
The output:
+------------+------------+------------------+
| Letter| Number| udf|
+------------+------------+------------------+
| [A, A, C]| [2, 1, 1]| [Nope, A, C]|
|[A, C, A, D]|[3, 1, 1, 2]|[Nope, C, A, Nope]|
+------------+------------+------------------+
I've created a new function named array_func_pd using pandas_udf, just to differentiate the original array_func, so that you have both functions to compare and play around.
from pyspark.sql import functions as f
from pyspark.sql.types import ArrayType, StringType
import pandas as pd
@f.pandas_udf(ArrayType(StringType()))
def array_func_pd(le, nr):
"""
le: pandas.Series< numpy.ndarray<string> >
nr: pandas.Series< numpy.ndarray<int> >
return: pd.Series< list<string> >
"""
res=[]
for i, (l_lst, n_lst) in enumerate(zip(le, nr)):
ret_lst = []
res.append(ret_lst)
l_lst2 = l_lst.tolist()
n_lst2 = n_lst.tolist()
for j,(l, n) in enumerate(zip(l_lst2, n_lst2)):
if n == 1:
ret_lst.append(l)
else:
ret_lst.append('Nope')
return pd.Series(res)
# Applying the udf
df = df.withColumn('udf', array_func_pd('Letter','Number'))
df.show()
And here is the output:
+------------+------------+------------------+
| Letter| Number| udf|
+------------+------------+------------------+
| [A, A, C]| [2, 1, 1]| [Nope, A, C]|
|[A, C, A, D]|[3, 1, 1, 2]|[Nope, C, A, Nope]|
+------------+------------+------------------+
There are two types of Pandas UDFs(aka Vectorized UDFs). For your case, I think it is best to keep it simple and use Scalar Pandas UDF.
Here is the notes for Scalar Pandas UDF from official document:
The Python function should take pandas.Series as inputs and return a pandas.Series of the same length. Internally, Spark will execute a Pandas UDF by splitting columns into batches and calling the function for each batch as a subset of the data, then concatenating the results together.
So in my code:
The output of the udf should be pd.Series, and should share the same count with le or nr.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With