Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is Spark DataFrame nested structure limited for selection?

I have a json file with some data, I’m able to create DataFrame out of it and the schema for particular part of it I’m interested in looks like following:

val json: DataFrame = sqlc.load("entities_with_address2.json", "json")

root
 |-- attributes: struct (nullable = true)
 |    |-- Address2: array (nullable = true)
 |    |    |-- value: struct (nullable = true)
 |    |    |    |-- Zip: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- value: struct (nullable = true)
 |    |    |    |    |    |    |-- Zip5: array (nullable = true)
 |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |-- value: string (nullable = true)

When I’m trying to just select the deepest field: json.select("attributes.Address2.value.Zip.value.Zip5").collect()

It gives me an exception: org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value, StructType(StructField(Zip5, ArrayType(StructType(StructField(value, StringType, true)), true), true)), true)), true), true);

By looking at the resolveGetField method of LogicalPlan I see that it's possible to select from StructType or from ArrayType(StructType), but is there any way to select deeper? How can I select field I need?

Here is the full exception.

    org.apache.spark.sql.AnalysisException: GetField is not valid on fields of type ArrayType(ArrayType(StructType(StructField(value,StructType(StructField(Zip5,ArrayType(StructType(StructField(value,StringType,true)),true),true)),true)),true),true);
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveGetField(LogicalPlan.scala:265)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$3.apply(LogicalPlan.scala:214)
        at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:214)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:117)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:50)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:46)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:252)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:251)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:122)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:127)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:46)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:44)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:89)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:44)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:40)
        at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
        at org.apache.spark.sql.DataFrame.logicalPlanToDataFrame(DataFrame.scala:157)
        at org.apache.spark.sql.DataFrame.select(DataFrame.scala:476)
        at org.apache.spark.sql.DataFrame.select(DataFrame.scala:491)
        at com.reltio.analytics.PREDF.test(PREDF.scala:55)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
like image 768
evgenii Avatar asked May 28 '15 08:05

evgenii


People also ask

How do you select nested columns in PySpark?

In PySpark, select() function is used to select single, multiple, column by index, all columns from the list and the nested columns from a DataFrame, PySpark select() is a transformation function hence it returns a new DataFrame with the selected columns.

How do I drop a nested column in Spark DataFrame?

You can always get all columns with the DataFrame's . columns() method, remove unwanted column from the sequence and do select(myColumns:_*) .

What is the difference between select and selectExpr in Spark?

Therefore, select() method is useful when you simply need to select a subset of columns from a particular Spark DataFrame. On the other hand, selectExpr() comes in handy when you need to select particular columns while at the same time you also need to apply some sort of transformation over particular column(s).

Is Spark sql faster than DataFrame?

Test results: RDD's outperformed DataFrames and SparkSQL for certain types of data processing. DataFrames and SparkSQL performed almost about the same, although with analysis involving aggregation and sorting SparkSQL had a slight advantage.


1 Answers

The problem is the ArrayType -- you can recreate this error very simply:

val df = Seq(Tuple1(Array[String]())).toDF("users")

At which point df.printSchema shows:

root
 |-- users: array (nullable = true)
 |    |-- element: string (containsNull = true)

And now if you try:

df.select($"users.element")

You get the exact same exception -- GetField is not valid...

You have a couple of different options to unwind the Array. You can get at individual items with getItem like this:

df.select($"users".getItem(0))

And since getItem returns another Column, you can dig as deep as you want:

df.select($"attributes.Address2".getItem(0).getField("value").getField("Zip").getItem(...)
// etc

But with an array, you probably want to programmatically unwind the whole Array. If you look at the way Hive handles this, you need to do a LATERAL VIEW. In Spark, you are going to have to use explode to create the equivalent of a Hive LATERAL VIEW:

case class User(name: String)
df.explode($"users"){ case Row(arr: Array[String]) =>  arr.map(User(_)) }

Note that I use a Case Class in my map -- this is what the docs have. If you don't want to create a case class you can just return a Tuple1 (or Tuple2 or Tuple3 etc):

df.explode($"users"){ case Row(arr: Array[String]) =>  arr.map(Tuple1(_)) }
like image 61
David Griffin Avatar answered Jan 03 '23 10:01

David Griffin