The python Pandas library contains the following function :
DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False,
right_index=False, sort=False, suffixes=('_x', '_y'), copy=True,
indicator=False)
The indicator field combined with Panda's value_counts() function can be used to quickly determine how well a join performed.
Example:
In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']})
In [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]})
In [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True)
Out[50]:
col1 col_left col_right _merge
0 0 a NaN left_only
1 1 b 2.0 both
2 2 NaN 2.0 right_only
3 2 NaN 2.0 right_only
What is the best way to check the performance of a join within a Spark Dataframe?
A custom function was provided in 1 of the answers: It does not yet give the correct results but it would be great if it would:
ASchema = StructType([StructField('id', IntegerType(),nullable=False),
StructField('name', StringType(),nullable=False)])
BSchema = StructType([StructField('id', IntegerType(),nullable=False),
StructField('role', StringType(),nullable=False)])
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])
ADF = hc.createDataFrame(AData,ASchema)
BDF = hc.createDataFrame(BData,BSchema)
DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer")
DFJOIN.show()
Input:
+----+--------+----+---------+
| id| name| id| role|
+----+--------+----+---------+
| 1| michel| 1| engineer|
| 2|diederik| 2| lead|
| 3| rok| 3|scientist|
| 4| piet|null| null|
|null| null| 5| manager|
+----+--------+----+---------+
from pyspark.sql.functions import *
DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\
.withColumn("id", coalesce(ADF["id"], BDF["id"]))\
.drop(ADF["id"])\
.drop(BDF["id"])
DFJOINMERGE.show()
Output
+---+--------+---+---------+------+
| id| name| id| role|_merge|
+---+--------+---+---------+------+
| 1| michel| 1| engineer| both|
| 2|diederik| 2| lead| both|
| 3| rok| 3|scientist| both|
| 4| piet| 4| null| both|
| 5| null| 5| manager| both|
+---+--------+---+---------+------+
==> I would expect id 4 to be left, and id 5 to be right.
Changing join to "left":
Input:
+---+--------+----+---------+
| id| name| id| role|
+---+--------+----+---------+
| 1| michel| 1| engineer|
| 2|diederik| 2| lead|
| 3| rok| 3|scientist|
| 4| piet|null| null|
+---+--------+----+---------+
Output
+---+--------+---+---------+------+
| id| name| id| role|_merge|
+---+--------+---+---------+------+
| 1| michel| 1| engineer| both|
| 2|diederik| 2| lead| both|
| 3| rok| 3|scientist| both|
| 4| piet| 4| null| both|
+---+--------+---+---------+------+
pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame. See Default Index Type. In order to avoid this overhead, specify the column to use as an index when possible.
In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.
The merge() method updates the content of two DataFrame by merging them together, using the specified method(s). Use the parameters to control which values to keep and which to replace.
Try this:
>>> from pyspark.sql.functions import *
>>> sdf1 = sqlContext.createDataFrame(df1)
>>> sdf2 = sqlContext.createDataFrame(df2)
>>> sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer")
>>> sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\
... .withColumn("col1", coalesce(sdf1["col1"], sdf2["col1"]))\
... .drop(sdf1["col1"])\
... .drop(sdf2["col1"])
Altered LostInOverflow 's answer and got this working:
from pyspark.sql import Row
ASchema = StructType([StructField('ida', IntegerType(),nullable=False),
StructField('name', StringType(),nullable=False)])
BSchema = StructType([StructField('idb', IntegerType(),nullable=False),
StructField('role', StringType(),nullable=False)])
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])
ADF = hc.createDataFrame(AData,ASchema)
BDF = hc.createDataFrame(BData,BSchema)
DFJOIN = ADF.join(BDF, ADF['ida'] == BDF['idb'], "outer")
DFJOIN.show()
+----+--------+----+---------+
| ida| name| idb| role|
+----+--------+----+---------+
| 1| michel| 1| engineer|
| 2|diederik| 2| lead|
| 3| rok| 3|scientist|
| 4| piet|null| null|
|null| null| 5| manager|
+----+--------+----+---------+
from pyspark.sql.functions import *
DFJOINMERGE = DFJOIN.withColumn("_merge", when(DFJOIN["ida"].isNull(), "right_only").when(DFJOIN["idb"].isNull(), "left_only").otherwise("both"))\
.withColumn("id", coalesce(ADF["ida"], BDF["idb"]))\
.drop(DFJOIN["ida"])\
.drop(DFJOIN["idb"])
#DFJOINMERGE.show()
DFJOINMERGE.groupBy("_merge").count().show()
+----------+-----+
| _merge|count|
+----------+-----+
|right_only| 1|
| left_only| 1|
| both| 3|
+----------+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With