Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to resolve the AnalysisException: resolved attribute(s) in Spark

val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5)))) val df1= rdd.toDF("id", "vals") val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt"))) val df2 = rdd1.toDF("id", "state") val df3 = df1.join(df2,df1("id")===df2("id"),"left") 

The join operation works fine but when I reuse the df2 I am facing unresolved attributes error

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y"))) val df4 = rdd2.toDF("id","existance") val df5 = df4.join(df2,df4("id")===df2("id"),"left") 

ERROR: org.apache.spark.sql.AnalysisException: resolved attribute(s)id#426

like image 994
Rajita Avatar asked Aug 16 '17 12:08

Rajita


2 Answers

As mentioned in my comment, it is related to https://issues.apache.org/jira/browse/SPARK-10925 and, more specifically https://issues.apache.org/jira/browse/SPARK-14948. Reuse of the reference will create ambiguity in naming, so you will have to clone the df - see the last comment in https://issues.apache.org/jira/browse/SPARK-14948 for an example.

like image 74
Erik Schmiegelow Avatar answered Sep 28 '22 20:09

Erik Schmiegelow


If you have df1, and df2 derived from df1, try renaming all columns in df2 such that no two columns have identical name after join. So before the join:

so instead of df1.join(df2...

do

# Step 1 rename shared column names in df2. df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')  # Step 2 do the join on the renamed df2 such that no two columns have same name. df1.join(df2_renamed) 
like image 29
Tomer Ben David Avatar answered Sep 28 '22 21:09

Tomer Ben David