Say I have two PySpark DataFrames df1
and df2
.
df1= 'a'
1
2
5
df2= 'b'
3
6
And I want to find the closest df2['b']
value for each df1['a']
, and add the closest values as a new column in df1
.
In other words, for each value x
in df1['a']
, I want to find a y
that achieves min(abx(x-y))
for all y in df2['b']
(note: can assume that there is only one y
that can achieve the minimum distance), and the result would be
'a' 'b'
1 3
2 3
5 6
I tried the following code to create a distance matrix first (before finding the values achieving the minimum distance):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
which gives
Column<PythonUDF#dist(a,b)>
Then I tried
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
which runs forever without giving error/output.
My questions are:
a
and b
values first, and then find the min
one)You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.
To apply a function that takes as input multiple column values, use the DataFrame's apply(~) method.
Apply Function to Column is an operation that is applied to column values in a PySpark Data Frame model. Apply Function to Column applies the transformation, and the end result is returned as a result. Apply Function to Column uses predefined functions as well as a user-defined function over PySpark.
PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. In order to change the value, pass an existing column name as a first argument and a value to be assigned as a second argument to the withColumn() function. Note that the second argument should be Column type .
Starting with your second question - you can apply udf only to existing dataframe, I think you were thinking for something like this:
>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show()
+---+---+--------+
| a| b|distance|
+---+---+--------+
| 1| 3| 2|
| 1| 6| 5|
| 2| 3| 1|
| 2| 6| 4|
| 5| 3| 2|
| 5| 6| 1|
+---+---+--------+
But there is a more efficient way to apply this distance, by using internal abs
:
>>> from pyspark.sql.functions import abs
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
Then you can find matching numbers by calculating:
>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b))
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance'))
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show()
+---+---+
| a| b|
+---+---+
| 5| 6|
| 1| 3|
| 2| 3|
+---+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With