Spark 2 Dataset Null value exception

Getting this null error in spark Dataset.filter

Input CSV:


Working code:

case class Person(name: String, age: Long, stat: String)

val peopleDS = spark.read.option("inferSchema","true")
  .option("header", "true").option("delimiter", ",")
spark.sql("select * from people where age > 30").show()

Failing code (Adding following lines return error):

val filteredDS = peopleDS.filter(_.age > 30)

Returns null error

java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
Exception you get should explain everything but let's go step-by-step:

  • When load data using csv data source all fields are marked as nullable:

    val path: String = ???
    val peopleDF = spark.read
      .option("header", "true")
      .option("delimiter", ",")
    |-- name: string (nullable = true)
    |-- age: integer (nullable = true)
    |-- stat: string (nullable = true)
  • Missing field is represented as SQL NULL

    |name| age|stat|
    | xyz|null|   s|
  • Next you convert Dataset[Row] to Dataset[Person] which uses Long to encode age field. Long in Scala cannot be null. Because input schema is nullable, output schema stays nullable despite of that:

    val peopleDS = peopleDF.as[Person]
     |-- name: string (nullable = true)
     |-- age: integer (nullable = true)
     |-- stat: string (nullable = true)

    Note that it as[T] doesn't affect the schema at all.

  • When you query Dataset using SQL (on registered table) or DataFrame API Spark won't deserialize the object. Since schema is still nullable we can execute:

    peopleDS.where($"age" > 30).show

    without any issues. This is just a plain SQL logic and NULL is a valid value.

  • When we use statically typed Dataset API:

    peopleDS.filter(_.age > 30)

    Spark has to deserialize the object. Because Long cannot be null (SQL NULL) it fails with exception you've seen.

    If it wasn't for that you'd get NPE.

  • Correct statically typed representation of your data should use Optional types:

    case class Person(name: String, age: Option[Long], stat: String)

    with adjusted filter function:

    peopleDS.filter(_.age.map(_ > 30).getOrElse(false))

    If you prefer you can use pattern matching:

    peopleDS.filter {
      case Some(age) => age > 30
      case _         => false     // or case None => false

    Note that you don't have to (but it would be recommended anyway) to use optional types for name and stat. Because Scala String is just a Java String it can be null. Of course if you go with this approach you have to explicitly check if accessed values are null or not.

