Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining PySpark DataFrames on nested field

I want to perform a join between these two PySpark DataFrames:

from pyspark import SparkContext
from pyspark.sql.functions import col

sc = SparkContext()

df1 = sc.parallelize([
    ['owner1', 'obj1', 0.5],
    ['owner1', 'obj1', 0.2],
    ['owner2', 'obj2', 0.1]
]).toDF(('owner', 'object', 'score'))

df2 = sc.parallelize(
          [Row(owner=u'owner1',
           objects=[Row(name=u'obj1', value=Row(fav=True, ratio=0.3))])]).toDF()

The join has to be performed on the name of the object, namely the field name inside objects for df2 and object for df1.

I am able to perform a SELECT on the nested field, as in

df2.where(df2.owner == 'owner1').select(col("objects.value.ratio")).show()

but I am not able to run this join:

df2.alias('u').join(df1.alias('s'), col('u.objects.name') == col('s.object'))

which returns error

pyspark.sql.utils.AnalysisException: u"cannot resolve '(objects.name = cast(object as double))' due to data type mismatch: differing types in '(objects.name = cast(object as double))' (array and double).;"

Any ideas how to solve this?

like image 246
mar tin Avatar asked Apr 12 '16 14:04

mar tin


People also ask

How do I merge two DataFrames with different columns in Spark?

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.

How do I join two PySpark data frames?

Summary: Pyspark DataFrames have a join method which takes three parameters: DataFrame on the right side of the join, Which fields are being joined on, and what type of join (inner, outer, left_outer, right_outer, leftsemi). You call the join method from the left side DataFrame object such as df1. join(df2, df1.

How do I read nested columns in PySpark?

If you have a struct (StructType) column on PySpark DataFrame, you need to use an explicit column qualifier in order to select the nested struct columns.

How do I join PySpark DataFrames on multiple columns?

The join syntax of PySpark join() takes, right dataset as first argument, joinExprs and joinType as 2nd and 3rd arguments and we use joinExprs to provide the join condition on multiple columns. Note that both joinExprs and joinType are optional arguments.


1 Answers

Since you want to match and extract specific element the simplest approach is to explode the row:

matches = df2.withColumn("object", explode(col("objects"))).alias("u").join(
  df1.alias("s"),
  col("s.object") == col("u.object.name")
)

matches.show()
## +-------------------+------+-----------------+------+------+-----+
## |            objects| owner|           object| owner|object|score|
## +-------------------+------+-----------------+------+------+-----+
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1|  obj1|  0.5|
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1|  obj1|  0.2|
## +-------------------+------+-----------------+------+------+-----+

Alternative, but very inefficient approach is to use array_contains:

matches_contains = df1.alias("s").join(
  df2.alias("u"), expr("array_contains(objects.name, object)"))

It is ineffective because it will be expanded to Cartesian product:

matches_contains.explain()
## == Physical Plan ==
## Filter array_contains(objects#6.name,object#4)
## +- CartesianProduct
##    :- Scan ExistingRDD[owner#3,object#4,score#5] 
##    +- Scan ExistingRDD[objects#6,owner#7]

If size of the array is relatively small it is possible to generate optimized version of array_contains as I've shown here: Filter by whether column value equals a list in spark

like image 102
zero323 Avatar answered Nov 02 '22 22:11

zero323