Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Date difference between consecutive rows - Pyspark Dataframe

I have a table with following structure

USER_ID     Tweet_ID                 Date
  1           1001       Thu Aug 05 19:11:39 +0000 2010
  1           6022       Mon Aug 09 17:51:19 +0000 2010
  1           1041       Sun Aug 19 11:10:09 +0000 2010
  2           9483       Mon Jan 11 10:51:23 +0000 2012
  2           4532       Fri May 21 11:11:11 +0000 2012
  3           4374       Sat Jul 10 03:21:23 +0000 2013
  3           4334       Sun Jul 11 04:53:13 +0000 2013

Basically what I would like to do is have a PysparkSQL query that calculates the date difference (in seconds) for consecutive records with the same user_id number. The expected result would be:

1      Sun Aug 19 11:10:09 +0000 2010 - Mon Aug 09 17:51:19 +0000 2010     839930
1      Mon Aug 09 17:51:19 +0000 2010 - Thu Aug 05 19:11:39 +0000 2010     340780
2      Fri May 21 11:11:11 +0000 2012 - Mon Jan 11 10:51:23 +0000 2012     1813212
3      Sun Jul 11 04:53:13 +0000 2013 - Sat Jul 10 03:21:23 +0000 2013     5510
like image 473
Josemy Avatar asked Jul 02 '16 04:07

Josemy


3 Answers

EDITED thanks to @cool_kid

@Joesemy answer is really good but didn't work for me since cast("bigint") threw an error. So I used the datediff function from the pyspark.sql.functions module this way and it worked :

from pyspark.sql.functions import *
from pyspark.sql.window import Window

df.withColumn("time_intertweet", datediff(df.date, lag(df.date, 1)
    .over(Window.partitionBy("user_‌​id")
    .orderBy("date")‌​)))
like image 118
LePuppy Avatar answered Sep 22 '22 13:09

LePuppy


Another way could be:

from pyspark.sql.functions import lag
from pyspark.sql.window import Window

df.withColumn("time_intertweet",(df.date.cast("bigint") - lag(df.date.cast("bigint"), 1)
.over(Window.partitionBy("user_‌​id")
.orderBy("date")‌​))
.cast("bigint"))
like image 39
2 revs, 2 users 77% Avatar answered Sep 23 '22 13:09

2 revs, 2 users 77%


Like this:

df.registerTempTable("df")

sqlContext.sql("""
     SELECT *, CAST(date AS bigint) - CAST(lag(date, 1) OVER (
              PARTITION BY user_id ORDER BY date) AS bigint) 
     FROM df""")
like image 25
user6022341 Avatar answered Sep 21 '22 13:09

user6022341