I am using Spark 2.1 with Scala 2.11 on a Databricks notebook
What is exactly TimestampType ?
We know from SparkSQL's documentation that's the official timestamp type is TimestampType, which is apparently an alias for java.sql.Timestamp :
TimestampType can be found here in the SparkSQL's Scala API
We have a difference when using a schema and the Dataset API
When parsing {"time":1469501297,"action":"Open"}
from the Databricks' Scala Structured Streaming example
Using a Json schema --> OK (I do prefer using the elegant Dataset API) :
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
val staticInputDF =
spark
.read
.schema(jsonSchema)
.json(inputPath)
Using the Dataset API --> KO: No Encoder found for TimestampType
Creating the Event class
import org.apache.spark.sql.types._
case class Event(action: String, time: TimestampType)
--> defined class Event
Errors when reading the events from DBFS on databricks.
Note: we don't get the error when using java.sql.Timestamp
as a type for "time"
val path = "/databricks-datasets/structured-streaming/events/"
val events = spark.read.json(path).as[Event]
Error message
java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.TimestampType
- field (class: "org.apache.spark.sql.types.TimestampType", name: "time")
- root class:
Combining the schema read method .schema(jsonSchema)
and the as[Type]
method containing the type java.sql.Timestamp
will solve this issue. The idea came to be after reading from the Structured Streaming documentation Creating streaming DataFrames and streaming Datasets
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame.
val path = "/databricks-datasets/structured-streaming/events/"
val jsonSchema = new StructType().add("time", TimestampType).add("action", StringType)
case class Event(action: String, time: java.sql.Timestamp)
val staticInputDS =
spark
.read
.schema(jsonSchema)
.json(path)
.as[Event]
staticInputDF.printSchema
Will output :
root
|-- time: timestamp (nullable = true)
|-- action: string (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With