Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Spark Dataframe have an equivalent option of Panda's merge indicator?

The python Pandas library contains the following function :

DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False,
                right_index=False, sort=False, suffixes=('_x', '_y'), copy=True,
                indicator=False)

The indicator field combined with Panda's value_counts() function can be used to quickly determine how well a join performed.

Example:

In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']})

In [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]})

In [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True)
Out[50]: 
   col1 col_left  col_right      _merge
0     0        a        NaN   left_only
1     1        b        2.0        both
2     2      NaN        2.0  right_only
3     2      NaN        2.0  right_only

What is the best way to check the performance of a join within a Spark Dataframe?

A custom function was provided in 1 of the answers: It does not yet give the correct results but it would be great if it would:

ASchema = StructType([StructField('id', IntegerType(),nullable=False),
                 StructField('name', StringType(),nullable=False)])
BSchema = StructType([StructField('id', IntegerType(),nullable=False),
                 StructField('role', StringType(),nullable=False)])
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])
ADF = hc.createDataFrame(AData,ASchema)
BDF = hc.createDataFrame(BData,BSchema)
DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer")
DFJOIN.show()

Input:
+----+--------+----+---------+
|  id|    name|  id|     role|
+----+--------+----+---------+
|   1|  michel|   1| engineer|
|   2|diederik|   2|     lead|
|   3|     rok|   3|scientist|
|   4|    piet|null|     null|
|null|    null|   5|  manager|
+----+--------+----+---------+

from pyspark.sql.functions import *
DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\
  .withColumn("id", coalesce(ADF["id"], BDF["id"]))\
   .drop(ADF["id"])\
   .drop(BDF["id"])
DFJOINMERGE.show()

Output
+---+--------+---+---------+------+
| id|    name| id|     role|_merge|
+---+--------+---+---------+------+
|  1|  michel|  1| engineer|  both|
|  2|diederik|  2|     lead|  both|
|  3|     rok|  3|scientist|  both|
|  4|    piet|  4|     null|  both|
|  5|    null|  5|  manager|  both|
+---+--------+---+---------+------+

 ==> I would expect id 4 to be left, and id 5 to be right.

Changing join to "left":


Input:
+---+--------+----+---------+
| id|    name|  id|     role|
+---+--------+----+---------+
|  1|  michel|   1| engineer|
|  2|diederik|   2|     lead|
|  3|     rok|   3|scientist|
|  4|    piet|null|     null|
+---+--------+----+---------+

Output
+---+--------+---+---------+------+
| id|    name| id|     role|_merge|
+---+--------+---+---------+------+
|  1|  michel|  1| engineer|  both|
|  2|diederik|  2|     lead|  both|
|  3|     rok|  3|scientist|  both|
|  4|    piet|  4|     null|  both|
+---+--------+---+---------+------+
like image 202
mnos Avatar asked Aug 02 '16 13:08

mnos


People also ask

Can I use pandas on Spark DataFrame?

pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame. See Default Index Type. In order to avoid this overhead, specify the column to use as an index when possible.

How do I merge DataFrames in Spark?

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.

What is merge in DataFrame?

The merge() method updates the content of two DataFrame by merging them together, using the specified method(s). Use the parameters to control which values to keep and which to replace.


Video Answer


2 Answers

Try this:

>>> from pyspark.sql.functions import *
>>> sdf1 = sqlContext.createDataFrame(df1)
>>> sdf2 = sqlContext.createDataFrame(df2)
>>> sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer")
>>> sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\
...  .withColumn("col1", coalesce(sdf1["col1"], sdf2["col1"]))\
...   .drop(sdf1["col1"])\
...   .drop(sdf2["col1"])
like image 192
user6022341 Avatar answered Oct 15 '22 10:10

user6022341


Altered LostInOverflow 's answer and got this working:

from pyspark.sql import Row

ASchema = StructType([StructField('ida', IntegerType(),nullable=False),
                 StructField('name', StringType(),nullable=False)])
BSchema = StructType([StructField('idb', IntegerType(),nullable=False),
                 StructField('role', StringType(),nullable=False)])
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])
ADF = hc.createDataFrame(AData,ASchema)
BDF = hc.createDataFrame(BData,BSchema)
DFJOIN = ADF.join(BDF, ADF['ida'] == BDF['idb'], "outer")
DFJOIN.show()


+----+--------+----+---------+
| ida|    name| idb|     role|
+----+--------+----+---------+
|   1|  michel|   1| engineer|
|   2|diederik|   2|     lead|
|   3|     rok|   3|scientist|
|   4|    piet|null|     null|
|null|    null|   5|  manager|
+----+--------+----+---------+

from pyspark.sql.functions import *
DFJOINMERGE = DFJOIN.withColumn("_merge", when(DFJOIN["ida"].isNull(), "right_only").when(DFJOIN["idb"].isNull(), "left_only").otherwise("both"))\
  .withColumn("id", coalesce(ADF["ida"], BDF["idb"]))\
   .drop(DFJOIN["ida"])\
   .drop(DFJOIN["idb"])
#DFJOINMERGE.show()
DFJOINMERGE.groupBy("_merge").count().show()

+----------+-----+
|    _merge|count|
+----------+-----+
|right_only|    1|
| left_only|    1|
|      both|    3|
+----------+-----+
like image 38
mnos Avatar answered Oct 15 '22 11:10

mnos