val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5)))) val df1= rdd.toDF("id", "vals") val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt"))) val df2 = rdd1.toDF("id", "state") val df3 = df1.join(df2,df1("id")===df2("id"),"left")
The join operation works fine but when I reuse the df2 I am facing unresolved attributes error
val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y"))) val df4 = rdd2.toDF("id","existance") val df5 = df4.join(df2,df4("id")===df2("id"),"left")
ERROR: org.apache.spark.sql.AnalysisException: resolved attribute(s)id#426
As mentioned in my comment, it is related to https://issues.apache.org/jira/browse/SPARK-10925 and, more specifically https://issues.apache.org/jira/browse/SPARK-14948. Reuse of the reference will create ambiguity in naming, so you will have to clone the df - see the last comment in https://issues.apache.org/jira/browse/SPARK-14948 for an example.
If you have df1, and df2 derived from df1, try renaming all columns in df2 such that no two columns have identical name after join. So before the join:
so instead of df1.join(df2...
do
# Step 1 rename shared column names in df2. df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed') # Step 2 do the join on the renamed df2 such that no two columns have same name. df1.join(df2_renamed)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With