I have an eventlog in csv consisting of three columns timestamp
, eventId
and userId
.
What I would like to do is append a new column nextEventId
to the dataframe.
An example eventlog:
eventlog = sqlContext.createDataFrame(Array((20160101, 1, 0),(20160102,3,1),(20160201,4,1),(20160202, 2,0))).toDF("timestamp", "eventId", "userId")
eventlog.show(4)
|timestamp|eventId|userId|
+---------+-------+------+
| 20160101| 1| 0|
| 20160102| 3| 1|
| 20160201| 4| 1|
| 20160202| 2| 0|
+---------+-------+------+
The desired endresult would be:
|timestamp|eventId|userId|nextEventId|
+---------+-------+------+-----------+
| 20160101| 1| 0| 2|
| 20160102| 3| 1| 4|
| 20160201| 4| 1| Nil|
| 20160202| 2| 0| Nil|
+---------+-------+------+-----------+
So far I've been messing around with sliding windows but can't figure out how to compare 2 rows...
val w = Window.partitionBy("userId").orderBy(asc("timestamp")) //should be a sliding window over 2 rows...
val nextNodes = second($"eventId").over(w) //should work if there are only 2 rows
What you're looking for is lead
(or lag
). Using window you already defined:
import org.apache.spark.sql.functions.lead
eventlog.withColumn("nextEventId", lead("eventId", 1).over(w))
For true sliding window (like sliding average) you can use rowsBetween
or rangeBetween
clauses of the window definition but it is not really required here. Nevertheless example usage could be something like this:
val w2 = Window.partitionBy("userId")
.orderBy(asc("timestamp"))
.rowsBetween(-1, 0)
avg($"foo").over(w2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With