Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark convert dataframe to dataset using case class with option fields

I have the following case class:

case class Person(name: String, lastname: Option[String] = None, age: BigInt) {}

And the following json:

{ "name": "bemjamin", "age" : 1 }

When I try to transform my dataframe into a dataset:

spark.read.json("example.json")
  .as[Person].show()

It shows me the following error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'lastname' given input columns: [age, name];

My question is: If my schema is my case class and it defines that the lastname is optional, shouldn't the as() do the conversion?

I can easily fix this using a .map but I would like to know if there is another cleaner alternative to this.

like image 469
Bemjamin Quintino Avatar asked Mar 12 '19 04:03

Bemjamin Quintino


2 Answers

We have one more option to solve above issue.There are 2 steps required

  1. Make sure that fields that can be missing are declared as nullable Scala types(like Option[_]).

  2. Provide a schema argument and not depend on schema inference.You can use for example use Spark SQL Encoder:

    import org.apache.spark.sql.Encoders
    
    val schema = Encoders.product[Person].schema
    

You can update code as below.

val schema = Encoders.product[Person].schema

val df = spark.read
           .schema(schema)
           .json("/Users/../Desktop/example.json")
           .as[Person]

+--------+--------+---+
|    name|lastname|age|
+--------+--------+---+
|bemjamin|    null|  1|
+--------+--------+---+
like image 84
KZapagol Avatar answered Oct 05 '22 23:10

KZapagol


When you are performing spark.read.json("example.json").as[Person].show(), it is basically reading the dataframe as ,

FileScan json [age#6L,name#7]

and then trying to apply the Encoders for Person object hence getting the AnalysisException as it is not able to find lastname from your json file.

Either you could hint spark that lastname is optional by supplying some data that has lastname or try this:

val schema: StructType = ScalaReflection.schemaFor[Person].dataType.asInstanceOf[StructType]
val x = spark.read
      .schema(schema)
      .json("src/main/resources/json/x.json")
      .as[Person]
+--------+--------+---+
|    name|lastname|age|
+--------+--------+---+
|bemjamin|    null|  1|
+--------+--------+---+

Hope it helps.

like image 21
Achilleus Avatar answered Oct 05 '22 23:10

Achilleus