Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL's Scala API - TimestampType - No Encoder found for org.apache.spark.sql.types.TimestampType

I am using Spark 2.1 with Scala 2.11 on a Databricks notebook

What is exactly TimestampType ?

​We know from ​​SparkSQL's documentation​ that's the official timestamp type is TimestampType, which is apparently an alias for java.sql.Timestamp :

TimestampType can be found here in the​ SparkSQL's Scala ​API

We have a difference when using a schema and the Dataset API

When parsing {"time":1469501297,"action":"Open"} from the Databricks' Scala Structured Streaming example

Using a Json schema --> OK (I do prefer using the elegant Dataset API) :

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

val staticInputDF = 
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)

Using the Dataset API --> KO: No Encoder found for TimestampType

Creating the Event class

import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event

Errors when reading the events from DBFS on databricks.

Note: we don't get the error when using java.sql.Timestamp as a type for "time"

val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]

Error message

java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class: 
like image 337
Paul Leclercq Avatar asked Jun 01 '17 20:06

Paul Leclercq


1 Answers

Combining the schema read method .schema(jsonSchema) and the as[Type] method containing the type java.sql.Timestamp will solve this issue. The idea came to be after reading from the Structured Streaming documentation Creating streaming DataFrames and streaming Datasets

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame.

val path = "/databricks-datasets/structured-streaming/events/"

val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)

case class Event(action: String, time: java.sql.Timestamp)

val staticInputDS = 
  spark
    .read
    .schema(jsonSchema)
    .json(path)
    .as[Event]

staticInputDF.printSchema

Will output :

root
 |-- time: timestamp (nullable = true)
 |-- action: string (nullable = true)
like image 155
Paul Leclercq Avatar answered Oct 29 '22 22:10

Paul Leclercq