I have the following two pySpark dataframe:
> df_lag_pre.columns
['date','sku','name','country','ccy_code','quantity','usd_price','usd_lag','lag_quantity']
> df_unmatched.columns
['alt_sku', 'alt_lag_quantity', 'country', 'ccy_code', 'name', 'usd_price']
Now I want to join them on common columns, so I try the following:
> df_lag_pre.join(df_unmatched, on=['name','country','ccy_code','usd_price'])
And I get the following error message:
AnalysisException: u'resolved attribute(s) price#3424 missing from country#3443,month#801,price#808,category#803,subcategory#804,page#805,date#280,link#809,name#806,quantity#807,ccy_code#3439,sku#3004,day#802 in operator !EvaluatePython PythonUDF#<lambda>(ccy_code#3439,price#3424), pythonUDF#811: string;'
Some of the columns that show up on this error, such as price, were part of another dataframe from which df_lag
was built from. I can't find any info on how to interpret this message, so any help would be greatly appreciated.
You can perform join this way in pyspark, Please see if this is useful for you:
df_lag_pre.alias("df1")
df_unmatched.alias("df2")
join_both = df1.join(df2, (col("df1.name") == col("df2.name")) & (col("df1.country") == col("df2.country")) & (col("df1.ccy_code") == col("df2.ccy_code")) & (col("df1.usd_price") == col("df2.usd_price")), 'inner')
Update: If you are getting col not defined error, please use below import
from pyspark.sql.functions import col
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With