Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0 Timestamp Difference in Milliseconds using Scala

I am using Spark 2.0 and looking for a way to achieve the following in Scala:

Need the time-stamp difference in milliseconds between two Data-frame column values.

Value_1 = 06/13/2017 16:44:20.044
Value_2 = 06/13/2017 16:44:21.067

Data-types for both is timestamp.

Note:Applying the function unix_timestamp(Column s) on both values and subtracting works but not upto the milliseconds value which is the requirement.

Final query would look like this:

Select **timestamp_diff**(Value_2,Value_1) from table1

this should return the following output:

1023 milliseconds

where timestamp_diff is the function that would calculate the difference in milliseconds.

like image 920
Roshan Avatar asked Oct 03 '17 08:10

Roshan


2 Answers

One way would be to use Unix epoch time, the number of milliseconds since 1 January 1970. Below is an example using an UDF, it takes two timestamps and returns the difference between them in milliseconds.

val timestamp_diff = udf((startTime: Timestamp, endTime: Timestamp) => {
  (startTime.getTime() - endTime.getTime())
})

val df = // dataframe with two timestamp columns (col1 and col2)
  .withColumn("diff", timestamp_diff(col("col2"), col("col1")))

Alternatively, you can register the function to use with SQL commands:

val timestamp_diff = (startTime: Timestamp, endTime: Timestamp) => {
  (startTime.getTime() - endTime.getTime())
}

spark.sqlContext.udf.register("timestamp_diff", timestamp_diff)
df.createOrReplaceTempView("table1")

val df2 = spark.sqlContext.sql("SELECT *, timestamp_diff(col2, col1) as diff from table1")
like image 79
Shaido Avatar answered Sep 19 '22 23:09

Shaido


The same for PySpark:

import datetime

def timestamp_diff(time1: datetime.datetime, time2: datetime.datetime):
    return int((time1-time2).total_seconds()*1000)

int and *1000 are only to output milliseconds

Example usage:

spark.udf.register("timestamp_diff", timestamp_diff)    

df.registerTempTable("table1")

df2 = spark.sql("SELECT *, timestamp_diff(col2, col1) as diff from table1")

It's not an optimal solution since UDFs are usually slow, so you might run into performance issues.

like image 35
Rick Avatar answered Sep 17 '22 23:09

Rick