Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Strutured Streaming automatically converts timestamp to local time

I have my timestamp in UTC and ISO8601, but using Structured Streaming, it gets automatically converted into the local time. Is there a way to stop this conversion? I would like to have it in UTC.

I'm reading json data from Kafka and then parsing them using the from_json Spark function.

Input:

{"Timestamp":"2015-01-01T00:00:06.222Z"}

Flow:

SparkSession
  .builder()
  .master("local[*]")
  .appName("my-app")
  .getOrCreate()
  .readStream()
  .format("kafka")
  ... //some magic
  .writeStream()
  .format("console")
  .start()
  .awaitTermination();

Schema:

StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});

Output:

+--------------------+
|           Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+

As you can see, the hour has incremented by itself.

PS: I tried to experiment with the from_utc_timestamp Spark function, but no luck.

like image 363
Martin Brisiak Avatar asked Feb 13 '18 12:02

Martin Brisiak


People also ask

Is Spark structured streaming real time?

Apache Spark Structured Streaming is a near-real time processing engine that offers end-to-end fault tolerance with exactly-once processing guarantees using familiar Spark APIs.

What is Spark default timezone?

According to the definition of the TIMESTAMP WITH SESSION TIME ZONE , Spark stores local timestamps in the UTC time zone, and uses the session time zone while extracting date-time fields or converting the timestamps to strings.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How do I get the current time in Spark?

current_timestamp() - Returns the current timestamp at the start of query evaluation.


2 Answers

For me it worked to use:

spark.conf.set("spark.sql.session.timeZone", "UTC")

It tells the spark SQL to use UTC as a default timezone for timestamps. I used it in spark SQL for example:

select *, cast('2017-01-01 10:10:10' as timestamp) from someTable

I know it does not work in 2.0.1. but works in Spark 2.2. I used in SQLTransformer also and it worked.

I am not sure about streaming though.

like image 161
astro_asz Avatar answered Oct 18 '22 21:10

astro_asz


Note:

This answer is useful primarily in Spark < 2.2. For newer Spark version see the answer by astro-asz

However we should note that as of Spark 2.4.0, spark.sql.session.timeZone doesn't set user.timezone (java.util.TimeZone.getDefault). So setting spark.sql.session.timeZone alone can result in rather awkward situation where SQL and non-SQL components use different timezone settings.

Therefore I still recommend setting user.timezone explicitly, even if spark.sql.session.timeZone is set.

TL;DR Unfortunately this is how Spark handles timestamps right now and there is really no built-in alternative, other than operating on epoch time directly, without using date/time utilities.

You can an insightful discussion on the Spark developers list: SQL TIMESTAMP semantics vs. SPARK-18350

The cleanest workaround I've found so far is to set -Duser.timezone to UTC for both the driver and executors. For example with submit:

bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
                --conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"

or by adjusting configuration files (spark-defaults.conf):

spark.driver.extraJavaOptions      -Duser.timezone=UTC
spark.executor.extraJavaOptions    -Duser.timezone=UTC
like image 41
zero323 Avatar answered Oct 18 '22 23:10

zero323