Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas udf loop over PySpark dataframe rows

I am trying to use pandas_udf since my data is in a PySpark dataframe but I would like to use a pandas library. I have a lot of rows so I cannot convert my PySpark dataframe into a Pandas dataframe.

I use textdistance (pip3 install textdistance) And import it: import textdistance.

test = spark.createDataFrame(
    [('dog cat', 'dog cat'), 
     ('cup dad', 'mug'),],
    ['value1', 'value2']
)

@pandas_udf('float', PandasUDFType.SCALAR)
def textdistance_jaro_winkler(a, b):
    return textdistance.jaro_winkler(a, b)

test = test.withColumn('jaro_winkler', textdistance_jaro_winkler(col('value1'), col('value2')))
test.show()

I am getting the following getting error:

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

I tried to pass the whole dataframe as an argument in the function and pass string values in the function but I believe it made it worse:

schema = StructType([StructField("value1", StringType(), True)
                     ,StructField("value2", StringType(), True)
                     ,StructField("jaro_winkler", FloatType(), True)
                    ])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def textdistance_jaro_winkler(df):
    df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'],  x['value2']))
    
    return df
like image 583
Norah Jones Avatar asked Sep 20 '25 21:09

Norah Jones


2 Answers

You need to rewrite your function to use pandas UDF Series to Series :

import pandas as pd
import textdistance
from pyspark.sql import functions as F

def textdistance_jaro_winkler(a: pd.Series, b: pd.Series) -> pd.Series:
    return pd.Series([textdistance.jaro_winkler(x, y) for x, y in zip(a, b)])


jaro_winkler_udf = F.pandas_udf(textdistance_jaro_winkler, returnType=FloatType())

test = test.withColumn('jaro_winkler', jaro_winkler_udf(col('value1'), col('value2')))
test.show()

#+-------+-------+------------+
#| value1| value2|jaro_winkler|
#+-------+-------+------------+
#|dog cat|dog cat|         1.0|
#|cup dad|    mug|   0.4920635|
#+-------+-------+------------+
like image 139
blackbishop Avatar answered Sep 22 '25 20:09

blackbishop


A normal Python UDF could do the job:

import pyspark.sql.functions as F
import textdistance

test2 = test.withColumn(
    'jaro_winkler',
    F.udf(textdistance.jaro_winkler)('value1', 'value2').cast('float')
)

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+

But it's not impossible to use a pandas UDF...

import textdistance

def textdistance_jaro_winkler(iterator):
    for df in iterator:
        df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'], x['value2']), axis=1)
        yield df

test2 = test.mapInPandas(textdistance_jaro_winkler, 'value1 string, value2 string, jaro_winkler float')

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+
like image 22
mck Avatar answered Sep 22 '25 20:09

mck