Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark Dynamic Join Condition

I have list of PK columns. I am storing primary keys because the number of primary keys for each table might change. I want to join two data frames on the basis of the columns in pk_list.

pk_list=['col1',col2', .... 'coln']

Right now my code looks like this:

full_load_tbl_nc = full_load_tbl.join(delta_load_tbl, (col(f) == col(s) for (f,s) in zip(pk_list,pk_list) ) , "leftanti")

When I run the code I get this error:

casting list pyspark.sql.column also fails col(pk_list) with import from pyspark.sql.functions import col File "/mnt/yarn/usercache/root/appcache/application_1544185829274_0001/container_1544185829274_0001_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 818, in join AssertionError: on should be Column or list of Column

like image 551
Uraish Avatar asked Jun 23 '26 17:06

Uraish


1 Answers

You need to pass keys for join as a List : Try Below Code

DF1_Columns = ['col1',col2']
DF2_Columns = ['Col11', 'Col22']
result = DF1.join(DF2, ([col(f) == col(s) for (f,s) in zip(DF1_Columns ,DF2_Columns )]) , "type")

or you may write the same join statement as below if the names of the Key columns in both dataframes are similar :

result= DF1.join(DF2, ([col(column) == col(column) for column in DF1_Columns ]) , "type")
like image 101
SagarS Avatar answered Jun 29 '26 12:06

SagarS