I want to perform a join between these two PySpark DataFrames:
from pyspark import SparkContext
from pyspark.sql.functions import col
sc = SparkContext()
df1 = sc.parallelize([
['owner1', 'obj1', 0.5],
['owner1', 'obj1', 0.2],
['owner2', 'obj2', 0.1]
]).toDF(('owner', 'object', 'score'))
df2 = sc.parallelize(
[Row(owner=u'owner1',
objects=[Row(name=u'obj1', value=Row(fav=True, ratio=0.3))])]).toDF()
The join has to be performed on the name of the object, namely the field name inside objects for df2 and object for df1.
I am able to perform a SELECT on the nested field, as in
df2.where(df2.owner == 'owner1').select(col("objects.value.ratio")).show()
but I am not able to run this join:
df2.alias('u').join(df1.alias('s'), col('u.objects.name') == col('s.object'))
which returns error
pyspark.sql.utils.AnalysisException: u"cannot resolve '(objects.name = cast(object as double))' due to data type mismatch: differing types in '(objects.name = cast(object as double))' (array and double).;"
Any ideas how to solve this?
In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.
Summary: Pyspark DataFrames have a join method which takes three parameters: DataFrame on the right side of the join, Which fields are being joined on, and what type of join (inner, outer, left_outer, right_outer, leftsemi). You call the join method from the left side DataFrame object such as df1. join(df2, df1.
If you have a struct (StructType) column on PySpark DataFrame, you need to use an explicit column qualifier in order to select the nested struct columns.
The join syntax of PySpark join() takes, right dataset as first argument, joinExprs and joinType as 2nd and 3rd arguments and we use joinExprs to provide the join condition on multiple columns. Note that both joinExprs and joinType are optional arguments.
Since you want to match and extract specific element the simplest approach is to explode
the row:
matches = df2.withColumn("object", explode(col("objects"))).alias("u").join(
df1.alias("s"),
col("s.object") == col("u.object.name")
)
matches.show()
## +-------------------+------+-----------------+------+------+-----+
## | objects| owner| object| owner|object|score|
## +-------------------+------+-----------------+------+------+-----+
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.5|
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.2|
## +-------------------+------+-----------------+------+------+-----+
Alternative, but very inefficient approach is to use array_contains
:
matches_contains = df1.alias("s").join(
df2.alias("u"), expr("array_contains(objects.name, object)"))
It is ineffective because it will be expanded to Cartesian product:
matches_contains.explain()
## == Physical Plan ==
## Filter array_contains(objects#6.name,object#4)
## +- CartesianProduct
## :- Scan ExistingRDD[owner#3,object#4,score#5]
## +- Scan ExistingRDD[objects#6,owner#7]
If size of the array is relatively small it is possible to generate optimized version of array_contains
as I've shown here: Filter by whether column value equals a list in spark
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With