I am using Apache Spark 2.0 and creating case class
for mention schema for DetaSet
. When i am trying to define custom encoder according to How to store custom objects in Dataset?, for java.time.LocalDate
i got following exception:
java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "callDate")
- root class: "FireService"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
............
Following is by code:
case class FireService(callNumber: String, callDate: java.time.LocalDate)
implicit val localDateEncoder: org.apache.spark.sql.Encoder[java.time.LocalDate] = org.apache.spark.sql.Encoders.kryo[java.time.LocalDate]
val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd /yyyy")
FireService(row.getAs[String](0), java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
How we can define third party api's encoder for spark?
Update
When i create the encoder for whole case class, df.map..
map the object into binary, as below:
implicit val fireServiceEncoder: org.apache.spark.sql.Encoder[FireService] = org.apache.spark.sql.Encoders.kryo[FireService]
val fireServiceDf = df.map(row => {
val dateFormatter = java.time.format.DateTimeFormatter.ofPattern("MM/dd/yyyy")
FireService(row.getAs[String](0), java.time.LocalDate.parse(row.getAs[String](4), dateFormatter))
})
fireServiceDf: org.apache.spark.sql.Dataset[FireService] = [value: binary]
I am expecting map for FireService, but return binary of map.
As the last comment there says, "if class contains a field Bar you need encoder for a whole object." You need to provide an implicit Encoder for FireService
itself; otherwise Spark constructs one for you using SQLImplicits.newProductEncoder[T <: Product : TypeTag]: Encoder[T]
. You can see from the type that it doesn't use any implicit
Encoder parameters for fields, so it can't use presence of localDateEncoder
.
Spark could be changed to handle this e.g. using the Shapeless library, or using macros directly; I don't know whether this is the plan in the future.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With