Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to convert unix timestamp to the given timezone with Spark

I use Spark 1.6.2

I have epochs like this:

|unix_timestamp|UTC                |Europe/Helsinki    |
|1491771599    |2017-04-09 20:59:59|2017-04-09 23:59:59|
|1491771600    |2017-04-09 21:00:00|2017-04-10 00:00:00|
|1491771601    |2017-04-09 21:00:01|2017-04-10 00:00:01|

The default timezone is the following on the Spark machines:

#timezone = DefaultTz: Europe/Prague, SparkUtilTz: Europe/Prague

the output of

logger.info("#timezone = DefaultTz: {}, SparkUtilTz: {}", TimeZone.getDefault.getID, org.apache.spark.sql.catalyst.util.DateTimeUtils.defaultTimeZone.getID)

I want to count the timestamps grouped by date and hour in the given timezone (now it is Europe/Helsinki +3hours).

What I expect:

|date      |hour     |count|
|2017-04-09|23       |1    |
|2017-04-10|0        |2    |

Code (using from_utc_timestamp):

def getCountsPerTime(sqlContext: SQLContext, inputDF: DataFrame, timeZone: String, aggr: String): DataFrame = {

    import sqlContext.implicits._

    val onlyTime = inputDF.select(
         from_utc_timestamp($"unix_timestamp".cast(DataTypes.TimestampType),  timeZone).alias("time")

    val visitsPerTime =
        if (aggr.equalsIgnoreCase("hourly")) {
                date_format($"time", "yyyy-MM-dd").alias("date"),
                date_format($"time", "H").cast(DataTypes.IntegerType).alias("hour"),
        } else if (aggr.equalsIgnoreCase("daily")) {
                date_format($"time", "yyyy-MM-dd").alias("date")



What I get:

|date      |hour     |count|
|2017-04-09|22       |1    |
|2017-04-09|23       |2    |

Trying to wrap it with to_utc_timestamp:

def getCountsPerTime(sqlContext: SQLContext, inputDF: DataFrame, timeZone: String, aggr: String): DataFrame = {

    import sqlContext.implicits._

    val onlyTime = inputDF.select(
        to_utc_timestamp(from_utc_timestamp($"unix_timestamp".cast(DataTypes.TimestampType), timeZone), DateTimeUtils.defaultTimeZone.getID).alias("time")

    val visitsPerTime = ... //same as above



What I get:

|tradedate |tradehour|count|
|2017-04-09|20       |1    |
|2017-04-09|21       |2    |

How to get the expected result?

like image 661
albundyszabolcs Avatar asked Jun 27 '17 10:06


1 Answers

Your codes are not working for me so I couldn't replicate the last two outputs you got.

But I am going to provide you some hints on how you can achieve the output you expected

I am assuming you already have dataframe as

|unix_timestamp|UTC                  |Europe/Helsinki      |
|1491750899    |2017-04-09 20:59:59.0|2017-04-09 23:59:59.0|
|1491750900    |2017-04-09 21:00:00.0|2017-04-10 00:00:00.0|
|1491750901    |2017-04-09 21:00:01.0|2017-04-10 00:00:01.0|

I got this dataframe by using following code

import sqlContext.implicits._
import org.apache.spark.sql.functions._
val inputDF = Seq(
      "2017-04-09 20:59:59",
      "2017-04-09 21:00:00",
      "2017-04-09 21:00:01"
val onlyTime = inputDF.select(
      from_utc_timestamp($"unix_timestamp".cast(DataTypes.TimestampType),  "UTC").alias("UTC"),
      from_utc_timestamp($"unix_timestamp".cast(DataTypes.TimestampType),  "Europe/Helsinki").alias("Europe/Helsinki")

Once you have above dataframe, getting the output dataframe that you desire would require you to split the date, groupby and count as below

onlyTime.select(split($"Europe/Helsinki", " ")(0).as("date"), split(split($"Europe/Helsinki", " ")(1).as("time"), ":")(0).as("hour"))
          .groupBy("date", "hour").agg(count("date").as("count"))

The resulting dataframe is

|date      |hour|count|
|2017-04-09|23  |1    |
|2017-04-10|00  |2    |
like image 133
Ramesh Maharjan Avatar answered Oct 10 '22 04:10

Ramesh Maharjan