Running through the spark-csv README there's sample Java code like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.*;
SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
new StructField("year", IntegerType, true),
new StructField("make", StringType, true),
new StructField("model", StringType, true),
new StructField("comment", StringType, true),
new StructField("blank", StringType, true));
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("cars.csv");
df.select("year", "model").write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv");
It didn't compile out of the box, so with some wrangling I got it to compile with changing the incorrect FooType
syntax to DataTypes.FooType
and passing the StructFields as a new StructField[]
; the compiler requested a fourth argument for metadata
in the constructor of StructField
but I had trouble finding documentation on what it means (javadocs describe its use cases, but not really how to decide what to pass in during StructField construction). With the following code, it now runs until any side-effect method like collect()
:
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
new StructField("case_id", DataTypes.StringType, false, null),
new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
.format("com.databricks.spark.csv")
.schema(featuresSchema)
.load(args[0]);
for (Row r : features.collect()) {
System.out.println("Row: " + r);
}
I get the following Exception:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...
Any idea what's wrong?
It seems the README is very outdated, and needs some significant editing for the Java example. I tracked down the actual JIRA which added the metadata field and it points at the usage of a default Map.empty
value for Scala cases, and whoever wrote the documentation must have just translated the Scala directly to Java despite the lack of the same default value for the input parameter.
In the 1.5 branch of SparkSQL's code we can see that it references metadata.hashCode()
without checking, which is what's causing the NullPointerException
. The existence of the Metadata.empty() method combined with the discussions about using empty maps as default in Scala seem to imply the correct implementation is to go ahead and pass Metadata.empty()
if you don't care about it. The revised example should be:
SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
new StructField("year", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("make", DataTypes.StringType, true, Metadata.empty()),
new StructField("model", DataTypes.StringType, true, Metadata.empty()),
new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.schema(customSchema)
.option("header", "true")
.load("cars.csv");
df.select("year", "model").write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With