Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use java.time.LocalDate in Datasets (fails with java.lang.UnsupportedOperationException: No Encoder found)? [duplicate]

  • Spark 2.1.1
  • Scala 2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

I'd like to transform my RDD into a Dataset. For this, I use the implicits method toDS() that give me the following error:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)

In my case, I must use the type java.time.LocalDate, I can't use the java.sql.data. I have read that I need to informe Spark how transforme Java type into Sql type, I this direction, I build the 2 implicits functions below:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)

Below, some code about my application:

case class Location(lat: Double, lon: Double)

case class TemperatureRow(
                             date: LocalDate,
                             location: Location,
                             temperature: Double
                         )

case class EncodedTemperatureRow(
                             date: String,
                             location: Location,
                             temperature: Double

val s = Seq[TemperatureRow](
                    TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
                    TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
                )

import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS

I don't know why Spark search an encoder for java.time.LocalDate, I provide implicit conversions for TemperatureRow and EncodedTemperatureRow...

like image 611
JimyRyan Avatar asked Jul 19 '17 13:07

JimyRyan


People also ask

What is LocalDate NOW () in Java?

The java. time. LocalDate. now() method obtains the current date from the system clock in the default time-zone.

What is import Java time LocalDate?

In this Java java. time. LocalDate class is imported which represents to display the current date. java. time: It is the package used to work with the current date and time API.


1 Answers

java.time.LocalDate is not supported up to Spark 2.2 (and I've been trying to write an Encoder for the type for some time and failed).

You have to convert java.time.LocalDate to some other supported type (e.g. java.sql.Timestamp or java.sql.Date), or epoch or date-time in string.

like image 103
Jacek Laskowski Avatar answered Oct 12 '22 14:10

Jacek Laskowski