Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting NullPointerException using spark-csv with DataFrames

Running through the spark-csv README there's sample Java code like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

It didn't compile out of the box, so with some wrangling I got it to compile with changing the incorrect FooType syntax to DataTypes.FooType and passing the StructFields as a new StructField[]; the compiler requested a fourth argument for metadata in the constructor of StructField but I had trouble finding documentation on what it means (javadocs describe its use cases, but not really how to decide what to pass in during StructField construction). With the following code, it now runs until any side-effect method like collect():

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

I get the following Exception:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

Any idea what's wrong?

like image 239
Dennis Huo Avatar asked Dec 21 '15 03:12

Dennis Huo


1 Answers

It seems the README is very outdated, and needs some significant editing for the Java example. I tracked down the actual JIRA which added the metadata field and it points at the usage of a default Map.empty value for Scala cases, and whoever wrote the documentation must have just translated the Scala directly to Java despite the lack of the same default value for the input parameter.

In the 1.5 branch of SparkSQL's code we can see that it references metadata.hashCode() without checking, which is what's causing the NullPointerException. The existence of the Metadata.empty() method combined with the discussions about using empty maps as default in Scala seem to imply the correct implementation is to go ahead and pass Metadata.empty() if you don't care about it. The revised example should be:

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");
like image 52
Dennis Huo Avatar answered Oct 16 '22 15:10

Dennis Huo