Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark-SQL Joining two dataframes/ datasets with same column name

I have below two datasets

controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status

I am using Java spark api to join them, I need only specific columns in the final dataset

private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};

controlSetDF.selectExpr(control_set_columns)                                               
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")), 
"left_outer"); 

But I get below error

org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)

There seems to be an issue because both dataframes have merchant_id column.

NOTE: If I don't use the .selectExpr() it works fine. But It will show all columns from first and second datasets.

like image 630
NewQueries Avatar asked Oct 18 '22 13:10

NewQueries


1 Answers

If the join columns are named the same in both DataFrames, you can simply define it as the join condition. In Scala it's a bit cleaner, with Java you need to convert a Java List to a Scala Seq:

Seq<String> joinColumns = scala.collection.JavaConversions
  .asScalaBuffer(Lists.newArrayList("merchant_id"));

controlSetDF.selectExpr(control_set_columns)
  .join(accountDF.selectExpr(sf_account_columns), joinColumns), "left_outer");

This will result in a DataFrame with only one of the join columns.

like image 145
Silvio Avatar answered Oct 21 '22 01:10

Silvio