I am trying to use pandas_udf
since my data is in a PySpark dataframe but I would like to use a pandas library. I have a lot of rows so I cannot convert my PySpark dataframe into a Pandas dataframe.
I use textdistance (pip3 install textdistance
)
And import it: import textdistance
.
test = spark.createDataFrame(
[('dog cat', 'dog cat'),
('cup dad', 'mug'),],
['value1', 'value2']
)
@pandas_udf('float', PandasUDFType.SCALAR)
def textdistance_jaro_winkler(a, b):
return textdistance.jaro_winkler(a, b)
test = test.withColumn('jaro_winkler', textdistance_jaro_winkler(col('value1'), col('value2')))
test.show()
I am getting the following getting error:
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
I tried to pass the whole dataframe as an argument in the function and pass string values in the function but I believe it made it worse:
schema = StructType([StructField("value1", StringType(), True)
,StructField("value2", StringType(), True)
,StructField("jaro_winkler", FloatType(), True)
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def textdistance_jaro_winkler(df):
df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'], x['value2']))
return df
You need to rewrite your function to use pandas UDF Series to Series :
import pandas as pd
import textdistance
from pyspark.sql import functions as F
def textdistance_jaro_winkler(a: pd.Series, b: pd.Series) -> pd.Series:
return pd.Series([textdistance.jaro_winkler(x, y) for x, y in zip(a, b)])
jaro_winkler_udf = F.pandas_udf(textdistance_jaro_winkler, returnType=FloatType())
test = test.withColumn('jaro_winkler', jaro_winkler_udf(col('value1'), col('value2')))
test.show()
#+-------+-------+------------+
#| value1| value2|jaro_winkler|
#+-------+-------+------------+
#|dog cat|dog cat| 1.0|
#|cup dad| mug| 0.4920635|
#+-------+-------+------------+
A normal Python UDF could do the job:
import pyspark.sql.functions as F
import textdistance
test2 = test.withColumn(
'jaro_winkler',
F.udf(textdistance.jaro_winkler)('value1', 'value2').cast('float')
)
test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat| 1.0|
|cup dad| mug| 0.4920635|
+-------+-------+------------+
But it's not impossible to use a pandas UDF...
import textdistance
def textdistance_jaro_winkler(iterator):
for df in iterator:
df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'], x['value2']), axis=1)
yield df
test2 = test.mapInPandas(textdistance_jaro_winkler, 'value1 string, value2 string, jaro_winkler float')
test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat| 1.0|
|cup dad| mug| 0.4920635|
+-------+-------+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With