Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: match the values of a DataFrame column against another DataFrame column

In Pandas DataFrame, I can use DataFrame.isin() function to match the column values against another column.

For example: suppose we have one DataFrame:

df_A = pd.DataFrame({'col1': ['A', 'B', 'C', 'B', 'C', 'D'], 
                     'col2': [1, 2, 3, 4, 5, 6]})
df_A

    col1  col2
0    A     1
1    B     2
2    C     3
3    B     4
4    C     5
5    D     6         

and another DataFrame:

df_B = pd.DataFrame({'col1': ['C', 'E', 'D', 'C', 'F', 'G', 'H'], 
                     'col2': [10, 20, 30, 40, 50, 60, 70]})
df_B

    col1  col2
0    C    10
1    E    20
2    D    30
3    C    40
4    F    50
5    G    60
6    H    70       

I can use .isin() function to match the column values of df_B against the column values of df_A

E.g.:

df_B[df_B['col1'].isin(df_A['col1'])]

yields:

    col1  col2
0    C    10
2    D    30
3    C    40

What's the equivalent operation in PySpark DataFrame?

df_A = pd.DataFrame({'col1': ['A', 'B', 'C', 'B', 'C', 'D'], 
                     'col2': [1, 2, 3, 4, 5, 6]})
df_A = sqlContext.createDataFrame(df_A)

df_B = pd.DataFrame({'col1': ['C', 'E', 'D', 'C', 'F', 'G', 'H'], 
                     'col2': [10, 20, 30, 40, 50, 60, 70]})
df_B = sqlContext.createDataFrame(df_B)


df_B[df_B['col1'].isin(df_A['col1'])]

The .isin() code above gives me an error messages:

u'resolved attribute(s) col1#9007 missing from 
col1#9012,col2#9013L in operator !Filter col1#9012 IN 
(col1#9007);;\n!Filter col1#9012 IN (col1#9007)\n+- 
LogicalRDD [col1#9012, col2#9013L]\n'
like image 218
cwl Avatar asked Mar 02 '17 02:03

cwl


People also ask

How do you find the difference between two columns in PySpark?

Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...

How do you check if a value is in a column PySpark?

In Spark use isin() function of Column class to check if a column value of DataFrame exists/contains in a list of string values.

What does describe () do in PySpark?

Computes basic statistics for numeric and string columns.

What is regexp_replace in PySpark?

regexp_replace is a string function that is used to replace part of a string (substring) value with another string on DataFrame column by using gular expression (regex). This function returns a org. apache. spark.


1 Answers

This kind of operation is called left semi join in spark:

df_B.join(df_A, ['col1'], 'leftsemi')
like image 174
Mariusz Avatar answered Sep 16 '22 20:09

Mariusz