I have below two datasets
controlSetDF : has columns loan_id, merchant_id, loan_type, created_date, as_of_date
accountDF : has columns merchant_id, id, name, status, merchant_risk_status
I am using Java spark api to join them, I need only specific columns in the final dataset
private String[] control_set_columns = {"loan_id", "merchant_id", "loan_type"};
private String[] sf_account_columns = {"id as account_id", "name as account_name", "merchant_risk_status"};
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns),controlSetDF.col("merchant_id").equalTo(accountDF.col("merchant_id")),
"left_outer");
But I get below error
org.apache.spark.sql.AnalysisException: resolved attribute(s) merchant_id#3L missing from account_name#131,loan_type#105,account_id#130,merchant_id#104L,loan_id#103,merchant_risk_status#2 in operator !Join LeftOuter, (merchant_id#104L = merchant_id#3L);;!Join LeftOuter, (merchant_id#104L = merchant_id#3L)
There seems to be an issue because both dataframes have merchant_id column.
NOTE: If I don't use the .selectExpr() it works fine. But It will show all columns from first and second datasets.
If the join columns are named the same in both DataFrames, you can simply define it as the join condition. In Scala it's a bit cleaner, with Java you need to convert a Java List to a Scala Seq:
Seq<String> joinColumns = scala.collection.JavaConversions
.asScalaBuffer(Lists.newArrayList("merchant_id"));
controlSetDF.selectExpr(control_set_columns)
.join(accountDF.selectExpr(sf_account_columns), joinColumns), "left_outer");
This will result in a DataFrame with only one of the join columns.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With