Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark Dataframe Apply function to two columns

Say I have two PySpark DataFrames df1 and df2.

df1=   'a' 
        1    
        2    
        5    

df2=   'b'
        3
        6

And I want to find the closest df2['b'] value for each df1['a'], and add the closest values as a new column in df1.

In other words, for each value x in df1['a'], I want to find a y that achieves min(abx(x-y)) for all y in df2['b'](note: can assume that there is only one y that can achieve the minimum distance), and the result would be

'a'    'b'
 1      3
 2      3
 5      6

I tried the following code to create a distance matrix first (before finding the values achieving the minimum distance):

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def dict(x,y):
    return abs(x-y)
udf_dict = udf(dict, IntegerType())

sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)

which gives

Column<PythonUDF#dist(a,b)>

Then I tried

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))

which runs forever without giving error/output.

My questions are:

  1. As I'm new to Spark, is my way to construct the output DataFrame efficient? (My way would be creating a distance matrix for all the a and b values first, and then find the min one)
  2. What's wrong with the last line of my code and how to fix it?
like image 497
Chianti5 Avatar asked Nov 02 '16 20:11

Chianti5


People also ask

How do I apply a function to multiple columns in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

How do I apply a function to multiple columns in a data frame?

To apply a function that takes as input multiple column values, use the DataFrame's apply(~) method.

How do you apply a function to a DataFrame in PySpark?

Apply Function to Column is an operation that is applied to column values in a PySpark Data Frame model. Apply Function to Column applies the transformation, and the end result is returned as a result. Apply Function to Column uses predefined functions as well as a user-defined function over PySpark.

How do you pass a column to a function in PySpark?

PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. In order to change the value, pass an existing column name as a first argument and a value to be assigned as a second argument to the withColumn() function. Note that the second argument should be Column type .


1 Answers

Starting with your second question - you can apply udf only to existing dataframe, I think you were thinking for something like this:

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
|  a|  b|distance|
+---+---+--------+
|  1|  3|       2|
|  1|  6|       5|
|  2|  3|       1|
|  2|  6|       4|
|  5|  3|       2|
|  5|  6|       1|
+---+---+--------+

But there is a more efficient way to apply this distance, by using internal abs:

>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))

Then you can find matching numbers by calculating:

>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+                                                                       
|  a|  b|
+---+---+
|  5|  6|
|  1|  3|
|  2|  3|
+---+---+
like image 150
Mariusz Avatar answered Oct 14 '22 01:10

Mariusz