I have a Spark Dataframe, where one of the fields is of MapType....I can fetch data of any of the keys of the maptype field, but am unable to do when I apply a filter for a specific value of a specific key...
val line = List (("Sanjay", Map("one" -> 1, "two" -> 2)), ("Taru", Map("one" -> 10, "two" -> 20)) )
I created RDD & DF of above List & am trying to fetch in DF, Map Values where value if >= 5 .....But I get below exception in Spark Repl.. Kindly help
val rowrddDFFinal = rowrddDF.select(rowrddDF("data.one").alias("data")).filter(rowrddDF("data.one").geq(5))
org.apache.spark.sql.AnalysisException: resolved attribute(s) data#1 missin //| g from data#3 in operator !Filter (data#1[one] AS one#4 >= 5); //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalys //| is(CheckAnalysis.scala:38) //| at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer //| .scala:42) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn //| alysis$1.apply(CheckAnalysis.scala:121) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAn //| alysis$1.apply(CheckAnalysis.scala:50) //| at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala //| :98) //| at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnaly //| sis(CheckAnalysis.scala:50) //| at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyze //| r.scala:42) //| at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLCont //| ext.scala:931)
To access values from an Array
or Map
column you can use Column.getItem
method:
rowrddDF
.where($"data".getItem("one").geq(5))
.select($"data".getItem("one").alias("data"))
If you prefer filter
after select
you cannot use rowrddDF.apply
anymore. Instead you should access aliased column directly:
df
.select($"data".getItem("one").alias("data"))
.filter($"data".geq(5))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With