Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing multiple columns in Pandas UDF PySpark

I want to calculate the Jaro Winkler distance between two columns of a PySpark DataFrame. Jaro Winkler distance is available through pyjarowinkler package on all nodes.

pyjarowinkler works as follows:

from pyjarowinkler import distance
distance.get_jaro_distance("A", "A", winkler=True, scaling=0.1)

Output:

1.0

I am trying to write a Pandas UDF to pass two columns as Series and calculate the distance using lambda function. Here's how I am doing it:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(str(distance_df['column_A']), str(distance_df['column_B']), winkler = True, scaling = 0.1))
    return distance_df['distance']

temp = temp.withColumn('jaro_distance', get_distance(temp.x, temp.x))

I should be able to pass any two string columns in the above function. I am getting the following output:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|         null|
|  B|  3|  4|         null|
|  C|  5|  6|         null|
|  D|  7|  8|         null|
+---+---+---+-------------+

Expected Output:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          1.0|
|  C|  5|  6|          1.0|
|  D|  7|  8|          1.0|
+---+---+---+-------------+

I suspect this might be because str(distance_df['column_A']) is not correct. It contains the concatenated string of all row values.

While this code works for me:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col):
    return col.apply(lambda x: distance.get_jaro_distance(x, "A", winkler = True, scaling = 0.1))

temp = temp.withColumn('jaro_distance', get_distance(temp.x))

Output:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          0.0|
|  C|  5|  6|          0.0|
|  D|  7|  8|          0.0|
+---+---+---+-------------+

Is there a way to do this with Pandas UDF? I'm dealing with millions of records so UDF will be expensive but still acceptable if it works. Thanks.

like image 690
K. K. Avatar asked Dec 06 '19 11:12

K. K.


People also ask

How do you use pandas UDF in PySpark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

How do you apply a function to a column in PySpark?

The syntax for Pyspark Apply Function to ColumnThe Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.


1 Answers

The error was from your function in the df.apply method, adjust it to the following should fix it:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(x['column_A'], x['column_B'], winkler = True, scaling = 0.1), axis=1)
    return distance_df['distance']

However, Pandas df.apply method is not vectorised which beats the purpose why we need pandas_udf over udf in PySpark. A faster and less overhead solution is to use list comprehension to create the returning pd.Series (check this link for more discussion about Pandas df.apply and its alternatives):

from pandas import Series

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
   return Series([ distance.get_jaro_distance(c1, c2, winkler=True, scaling=0.1) for c1,c2 in zip(col1, col2) ])

df.withColumn('jaro_distance', get_distance('x', 'y')).show()
+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
| AB| 1B|  2|         0.67|
| BB| BB|  4|          1.0|
| CB| 5D|  6|          0.0|
| DB|B7F|  8|         0.61|
+---+---+---+-------------+
like image 126
jxc Avatar answered Oct 12 '22 04:10

jxc