Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding 12 hours to datetime column in Spark

I have tried to search quite a bit, but could only found add_month function in Spark SQL, so ending up opening a new thread here. Would appreciate any help someone could offer.

I am trying to add hours 12, 24, and 48 to a date column in Spark SQL using sqlContext. I am using 1.6.1 version of Spark and I need something like this:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text
FROM NOTEEVENTS N2,
(SELECT subject_id, MIN(chartdate) chartdate_start
  FROM NOTEEVENTS
  WHERE subject_id = 283
  AND category != 'Discharge summary'
GROUP BY subject_id) N1
WHERE N2.subject_id = N1.subject_id
and n2.chartdate < n1.chartdate_start + interval '1 hour' * 12

Please notice the last clause, which is written in PostgreSql, and is what I need in Spark SQL. I'd really appreciate any help I could get.

Thanks.

like image 898
Ahsan Avatar asked Nov 30 '16 07:11

Ahsan


3 Answers

Same as in PostgreSQL, you can use INTERVAL. In SQL

spark.sql("""SELECT current_timestamp() AS now, 
                    current_timestamp() + INTERVAL 12 HOURS AS now_plus_twelve"""
).show(false)
+-----------------------+-----------------------+
|now                    |now_plus_twelve        |
+-----------------------+-----------------------+
|2017-12-14 10:49:15.115|2017-12-14 22:49:15.115|
+-----------------------+-----------------------+

With Dataset - Scala:

import org.apache.spark.sql.functions.{current_timestamp, expr}

spark.range(1)
  .select(
    current_timestamp as "now", 
    current_timestamp + expr("INTERVAL 12 HOURS") as "now_plus_twelve"
  ).show(false)
+-----------------------+-----------------------+
|now                    |now_plus_twelve        |
+-----------------------+-----------------------+
|2017-12-14 10:56:59.185|2017-12-14 22:56:59.185|
+-----------------------+-----------------------+

Python:

from pyspark.sql.functions import current_timestamp, expr

(spark.range(1).select(
    current_timestamp().alias("now"), 
    (current_timestamp() + expr("INTERVAL 12 HOURS")).alias("now_plus_twelve")))
like image 136
zero323 Avatar answered Nov 24 '22 01:11

zero323


Currently there's no such function, but you can write UDF:

sqlContext.udf.register("add_hours", (datetime : Timestamp, hours : Int) => {
    new Timestamp(datetime.getTime() + hours * 60 * 60 * 1000 )
});

For example:

SELECT N1.subject_id, '12-HOUR' AS notes_period, N1.chartdate_start, N2.chartdate, N2.text
    FROM NOTEEVENTS N2,
    (SELECT subject_id, MIN(chartdate) chartdate_start
      FROM NOTEEVENTS
      WHERE subject_id = 283
      AND category != 'Discharge summary'
    GROUP BY subject_id) N1
    WHERE N2.subject_id = N1.subject_id
    and n2.chartdate < add_hours(n1.chartdate_start, 12)

You can also use unix_timestamp function to calculate new date. It's less readable in my opinion, but can use WholeStage Code Gen. Code inspired by Anton Okolnychyi other answer

import org.apache.spark.sql.functions._
val addMonths = (datetime : Column, hours : Column) => {
     from_unixtime(unix_timestamp(n1.chartdate_start) + 12 * 60 * 60)
}
like image 22
T. Gawęda Avatar answered Nov 24 '22 01:11

T. Gawęda


What about using the unix_timestamp() function to convert a date to a timestamp in seconds and then adding hours * 60 * 60 to it?

Then your condition will look like:

unix_timestamp(n2.chartdate) < (unix_timestamp(n1.chartdate_start) + 12 * 60 * 60)

like image 29
Anton Okolnychyi Avatar answered Nov 24 '22 00:11

Anton Okolnychyi