I have a Spark Dataframe in that consists of a series of dates:
from pyspark.sql import SQLContext from pyspark.sql import Row from pyspark.sql.types import * sqlContext = SQLContext(sc) import pandas as pd rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'), ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'), ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'), ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'), ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')]) schema = StructType([StructField('ID', StringType(), True), StructField('EndDateTime', StringType(), True), StructField('StartDateTime', StringType(), True)]) df = sqlContext.createDataFrame(rdd, schema)
What I want to do is find duration
by subtracting EndDateTime
and StartDateTime
. I figured I'd try and do this using a function:
# Function to calculate time delta def time_delta(y,x): end = pd.to_datetime(y) start = pd.to_datetime(x) delta = (end-start) return delta # create new RDD and add new column 'Duration' by applying time_delta function df2 = df.withColumn('Duration', time_delta(df.EndDateTime, df.StartDateTime))
However this just gives me:
>>> df2.show() ID EndDateTime StartDateTime ANI Duration X01 2014-02-13T12:36:... 2014-02-13T12:31:... sip:4534454450 null X02 2014-02-13T12:35:... 2014-02-13T12:32:... sip:6413445440 null X03 2014-02-13T12:36:... 2014-02-13T12:32:... sip:4534437492 null XO4 2014-02-13T12:37:... 2014-02-13T12:32:... sip:6474454453 null XO5 2014-02-13T12:36:... 2014-02-13T12:33:... sip:8874458555 null
I'm not sure if my approach is correct or not. If not, I'd gladly accept another suggested way to achieve this.
When the function receives the date string it will first use the Pandas to_datetime() function to convert it to a Python datetime and it will then use the timedelta() function to subtract the number of days defined in the days variable.
Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...
In order to get difference between two dates in days, years, months and quarters in pyspark can be accomplished by using datediff() and months_between() function. datediff() Function calculates the difference between two dates in days in pyspark.
The ceil function is a PySpark function that is a Roundup function that takes the column value and rounds up the column value with a new column in the PySpark data frame. This is an example of a Round-Up Function.
As of Spark 1.5 you can use unix_timestamp:
from pyspark.sql import functions as F timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS" timeDiff = (F.unix_timestamp('EndDateTime', format=timeFmt) - F.unix_timestamp('StartDateTime', format=timeFmt)) df = df.withColumn("Duration", timeDiff)
Note the Java style time format.
>>> df.show() +---+--------------------+--------------------+--------+ | ID| EndDateTime| StartDateTime|Duration| +---+--------------------+--------------------+--------+ |X01|2014-02-13T12:36:...|2014-02-13T12:31:...| 258| |X02|2014-02-13T12:35:...|2014-02-13T12:32:...| 204| |X03|2014-02-13T12:36:...|2014-02-13T12:32:...| 228| |XO4|2014-02-13T12:37:...|2014-02-13T12:32:...| 269| |XO5|2014-02-13T12:36:...|2014-02-13T12:33:...| 202| +---+--------------------+--------------------+--------+
Thanks to David Griffin. Here's how to do this for future reference.
from pyspark.sql import SQLContext, Row sqlContext = SQLContext(sc) from pyspark.sql.types import StringType, IntegerType, StructType, StructField from pyspark.sql.functions import udf # Build sample data rdd = sc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876'), ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321'), ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229'), ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881'), ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323')]) schema = StructType([StructField('ID', StringType(), True), StructField('EndDateTime', StringType(), True), StructField('StartDateTime', StringType(), True)]) df = sqlContext.createDataFrame(rdd, schema) # define timedelta function (obtain duration in seconds) def time_delta(y,x): from datetime import datetime end = datetime.strptime(y, '%Y-%m-%dT%H:%M:%S.%f') start = datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%f') delta = (end-start).total_seconds() return delta # register as a UDF f = udf(time_delta, IntegerType()) # Apply function df2 = df.withColumn('Duration', f(df.EndDateTime, df.StartDateTime))
Applying time_delta()
will give you duration in seconds:
>>> df2.show() ID EndDateTime StartDateTime Duration X01 2014-02-13T12:36:... 2014-02-13T12:31:... 258 X02 2014-02-13T12:35:... 2014-02-13T12:32:... 204 X03 2014-02-13T12:36:... 2014-02-13T12:32:... 228 XO4 2014-02-13T12:37:... 2014-02-13T12:32:... 268 XO5 2014-02-13T12:36:... 2014-02-13T12:33:... 202
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With