Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark 2.0 handle column nullability?

In the recently released The Data Engineer's Guide to Apache Spark, the authors stated (page 74):

"...when you define a schema where all columns are declared to not have null values - Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be hard to debug."

While going over notes and previous JIRAs, it appears that the statement above may really no longer be true.

According to SPARK-13740 and SPARK-15192, it looks like when a schema is defined on a DataFrame creation that nullability is enforced.

Could I get some clarification? I'm no longer certain what the behavior is.

like image 636
Wes Avatar asked Nov 24 '17 21:11

Wes


2 Answers

Different DataFrame creation processes are handled differently with respect to null types. It's not really straightforward, because there are at least three different areas that nulls are being handled completely differently.

  1. First, SPARK-15192 is about RowEncoders. And in the case of RowEncoders, there are no nulls allowed, and the error messages have been improved. For example, with the two dozen or so overloading of SparkSession.createDataFrame(), there are quite a few implementations of createDataFrame() that are basically converting an RDD to a DataFrame. In my example below no nulls were accepted. So try something similar to converting an RDD to a DataFrame using createDateFrame() method like below and you will get same results...

    val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true)))
    val intNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)))
    spark.createDataFrame(intNullsRDD, schema).show()
    

In Spark 2.1.1, the error message is pretty nice.

17/11/23 21:30:37 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 6)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: The 0th field 'colA' of input row cannot be null.
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType) AS colA#73
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA), IntegerType)
   +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, colA)
      +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
         +- input[0, org.apache.spark.sql.Row, true]

Stepping through the code, you can see where this happens. Way below in the doGenCode() method there is the validation. And immediately below, when the RowEncoder object is being created with val encoder = RowEncoder(schema), that logic begins.

     @DeveloperApi
     @InterfaceStability.Evolving
     def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
     createDataFrame(rowRDD, schema, needsConversion = true)
    }

    private[sql] def createDataFrame(
      rowRDD: RDD[Row],
      schema: StructType,
      needsConversion: Boolean) = {
    // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
    // schema differs from the existing schema on any field data type.
    val catalystRows = if (needsConversion) {
      val encoder = RowEncoder(schema)
      rowRDD.map(encoder.toRow)
    } else {
      rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
      }
      val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
      Dataset.ofRows(self, logicalPlan)
    }

After stepping through this logic more, here is that improved message in objects.scala and this is where the code handles null values. Actually the error message is passed into ctx.addReferenceObj(errMsg) but you get the idea.

 case class GetExternalRowField(
    child: Expression,
    index: Int,
    fieldName: String) extends UnaryExpression with NonSQLExpression {

  override def nullable: Boolean = false
  override def dataType: DataType = ObjectType(classOf[Object])
  override def eval(input: InternalRow): Any =
    throw new UnsupportedOperationException("Only code-generated evaluation is supported")

  private val errMsg = s"The ${index}th field '$fieldName' of input row cannot be null."

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    // Use unnamed reference that doesn't create a local field here to reduce the number of fields
    // because errMsgField is used only when the field is null.
    val errMsgField = ctx.addReferenceObj(errMsg)
    val row = child.genCode(ctx)
    val code = s"""
      ${row.code}

      if (${row.isNull}) {
        throw new RuntimeException("The input external row cannot be null.");
      }

      if (${row.value}.isNullAt($index)) {
        throw new RuntimeException($errMsgField);
      }

      final Object ${ev.value} = ${row.value}.get($index);
     """
    ev.copy(code = code, isNull = "false")
  }
} 
  1. Something completely different happens when pulling from an HDFS data source. In this case there will be no error message when there is a non-nullable column and a null comes in. The column still accepts null values. Check out the quick testFile "testFile.csv" I created and then put it into hdfs hdfs dfs -put testFile.csv /data/nullTest

       |colA|colB|colC|colD| 
       |    |    |    |    |
       |    |   2|   2|   2|
       |    |   3|    |    |
       |   4|    |    |    |
    

When I read from the file below with the same nschema schema, all of the blank values became null, even if the field was non-nullable. There are ways of how to handle blanks differently, but this is the default. Both csv and parquet had the same results.

val nschema = StructType(Seq(StructField("colA", IntegerType, nullable = true), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = true), StructField("colD", IntegerType, nullable = true)))
val jListNullsADF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,nschema)
jListNullsADF.write.format("parquet").save("/data/parquetnulltest")
spark.read.format("parquet").schema(schema).load("/data/parquetnulltest").show()

+----+----+----+----+
|colA|colB|colC|colD|
+----+----+----+----+
|null|null|null|null|
|null|   2|   2|   2|
|null|null|   3|null|
|null|   4|null|   4|
+----+----+----+----+

The cause of the nulls being allowed starts with the DataFrameReader creation where a call is made to baseRelationToDataFrame() in DataFramerReader.scala. baseRelationToDataFrame() in SparkSession.scala uses a QueryPlan class in the method and the QueryPlan is recreating the StructType. The method fromAttributes() which always has nullable fields is basically the same schema as the original one but forces nullability. Thus, by the time it gets back RowEncoder(), it is now a nullable version of the original schema.

Immediately below in DataFrameReader.scala you can see the baseRelationToDataFrame() call...

  @scala.annotation.varargs
  def load(paths: String*): DataFrame = {
    sparkSession.baseRelationToDataFrame(
      DataSource.apply(
        sparkSession,
        paths = paths,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap).resolveRelation())
  }

Immediately below in the file SparkSession.scala you can see the Dataset.ofRows(self: SparkSession, lr: LogicalRelation) method is being called, pay close attention to the LogicalRelation plan constructor.

  def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
    Dataset.ofRows(self, LogicalRelation(baseRelation))
  }

In Dataset.scala, the analyzed QueryPlan object's schema property is being passed as the third argument to create the Dataset in new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)).

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }
}

In QueryPlan.scala the StructType.fromAttributes() method is being used

 lazy val schema: StructType = StructType.fromAttributes(output)

And finally in StructType.scala the nullable property is always nullable.

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

About the query plan being different based on nullability, I think it is totally possible that the LogicalPlan was different based on whether a column was nullable or not. A lot of information is passed into that object and there is a lot of subsequent logic to creeate the plan. But it is not being kept nullable when it is actually writing the dataframe, as we saw a second ago.

  1. The third case is dependent on DataType. When you create a DataFrame using the method createDataFrame(rows: java.util.List[Row], schema: StructType) it will actually create zeros where there is a null passed into a non-nullable IntegerType field. You can see the example below...

      val schema = StructType(Seq(StructField("colA", IntegerType, nullable = false), StructField("colB", IntegerType, nullable = true), StructField("colC", IntegerType, nullable = false), StructField("colD", IntegerType, nullable = true))) 
      val jListNullsDF = spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row(2,null,null,null),org.apache.spark.sql.Row(null,3,null,null),org.apache.spark.sql.Row(null,null,null,4)).asJava,schema)
      jListNullsDF.show() 
    
      +----+----+----+----+
      |colA|colB|colC|colD|
      +----+----+----+----+
      |   0|null|   0|null|
      |   2|null|   0|null|
      |   0|   3|   0|null|
      |   0|null|   0|   4|
      +----+----+----+----+
    

It looks like there is logic in org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt() that substitutes zeros for nulls. However, with non-nullable StringType fields, nulls are not handled as gracefully.

   val strschema = StructType(Seq(StructField("colA", StringType, nullable = false), StructField("colB", StringType, nullable = true), StructField("colC", StringType, nullable = false), StructField("colD", StringType, nullable = true)))
   val strNullsRDD = sc.parallelize(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2colA",null,null,null),org.apache.spark.sql.Row(null,"r3colC",null,null),org.apache.spark.sql.Row(null,null,null,"r4colD")))
spark.createDataFrame(List(org.apache.spark.sql.Row(null,null,null,null),org.apache.spark.sql.Row("r2cA",null,null,null),org.apache.spark.sql.Row(null,"row3cB",null,null),org.apache.spark.sql.Row(null,null,null,"row4ColD")).asJava,strschema).show()

but below is the not very helpful error message that doesn't specify the ordinal position of the field...

java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
like image 57
uh_big_mike_boi Avatar answered Oct 23 '22 00:10

uh_big_mike_boi


Long story short we don't know. It is true that Spark become much stricter with enforcing nullable attributes

However considering complexity of Spark (number of guest languages, size of the library, number of low level mechanisms used for optimizations, plugable data sources, and relatively large pool of legacy code) there is really no guarantee that fairly limited safety checks included in the recent versions cover all possible scenarios.

like image 38
user9006695 Avatar answered Oct 22 '22 22:10

user9006695