Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe sliding window over pair of rows

I have an eventlog in csv consisting of three columns timestamp, eventId and userId.

What I would like to do is append a new column nextEventId to the dataframe.

An example eventlog:

eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)

|timestamp|eventId|userId|
+---------+-------+------+
| 20160101|      1|     0|
| 20160102|      3|     1|
| 20160201|      4|     1|
| 20160202|      2|     0|
+---------+-------+------+

The desired endresult would be:

|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101|      1|     0|          2|
| 20160102|      3|     1|          4|
| 20160201|      4|     1|        Nil|
| 20160202|      2|     0|        Nil|
+---------+-------+------+-----------+

So far I've been messing around with sliding windows but can't figure out how to compare 2 rows...

val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows
like image 631
Tim Avatar asked Dec 18 '22 16:12

Tim


1 Answers

What you're looking for is lead (or lag). Using window you already defined:

import org.apache.spark.sql.functions.lead

eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))

For true sliding window (like sliding average) you can use rowsBetween or rangeBetween clauses of the window definition but it is not really required here. Nevertheless example usage could be something like this:

val w2 =  Window.partitionBy("userId")
  .orderBy(asc("timestamp"))
  .rowsBetween(-1, 0)

avg($"foo").over(w2)
like image 81
zero323 Avatar answered Dec 29 '22 17:12

zero323