Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use lag and rangeBetween functions on timestamp values?

I have data that looks like this:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769

I would like to add a new boolean column that marks true if there are 2 or moreuserid within a 5 minutes window in the same location_point. I had an idea of using lag function to lookup over a window partitioned by the userid and with the range between the current timestamp and the next 5 minutes:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")

This code is giving me an analysis exception when I use the RangeBetween function:

AnalysisException: u'Window Frame RANGE BETWEEN CURRENT ROW AND 1500 FOLLOWING must match the required frame ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING;

Do you know any way to tackle this problem?

like image 644
ebertbm Avatar asked Aug 06 '17 11:08

ebertbm


People also ask

What is lead and lag in spark SQL?

Data processing tool. Spark. Lag is used to access data from n rows prior, and allows the current row to access that data. Lead follows the same logic, but is used to access data from n rows ahead. An example use case is finding previous and next order dates for customers.

What is spark lag function?

Spark LAG function provides access to a row at a given offset that comes before the current row in the windows. This function can be used in a SELECT statement to compare values in the current row with values in a previous row.

How does window function work in spark?

Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org. apache. spark. sql.

Does spark SQL support window functions?

Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.


2 Answers

Given your data:

Let's add a column with a timestamp in seconds:

df = df.withColumn('timestamp',df_taf.eventtime.astype('Timestamp').cast("long"))
df.show()

+--------+-------------------+--------------+----------+
|  userid|          eventtime|location_point| timestamp|  
+--------+-------------------+--------------+----------+
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|
+--------+-------------------+--------------+----------+  

Now, let's define a window function, with a partition by location_point, an order by timestamp and a range between -300s and current time. We can count the number of elements in this window and put these data in a column named 'occurences in_5_min':

w = Window.partitionBy('location_point').orderBy('timestamp').rangeBetween(-60*5,0)
df = df.withColumn('occurrences_in_5_min',F.count('timestamp').over(w))
df.show()

+--------+-------------------+--------------+----------+--------------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|
+--------+-------------------+--------------+----------+--------------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|
+--------+-------------------+--------------+----------+--------------------+

Now you can add the desired column with True if the number of occurences is strictly more than 1 in the last 5 minutes on a particular location:

add_bool = udf(lambda col : True if col>1 else False, BooleanType())
df = df.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df.show()

+--------+-------------------+--------------+----------+--------------------+---------------+
|  userid|          eventtime|location_point| timestamp|occurrences_in_5_min|already_occured|
+--------+-------------------+--------------+----------+--------------------+---------------+
|40e8a7c3|2017-06-04 03:12:00|      18825769|1496545920|                   1|          false|
|3136afcb|2017-06-04 03:03:00|      18382821|1496545380|                   1|          false|
|661212dd|2017-06-04 03:06:00|      80831484|1496545560|                   1|          false|
|4e191908|2017-06-04 03:00:00|      18685891|1496545200|                   1|          false|
|4e191908|2017-06-04 03:04:00|      18685891|1496545440|                   2|           true|
|4e191908|2017-06-04 03:11:30|      18685891|1496545890|                   1|          false|
+--------+-------------------+--------------+----------+--------------------+---------------+
like image 61
plalanne Avatar answered Sep 25 '22 07:09

plalanne


rangeBetween just doesn't make sense for non-aggregate function like lag. lag takes always a specific row, denoted by offset argument, so specifying frame is pointless.

To get a window over time series you can use window grouping with standard aggregates:

from pyspark.sql.functions import window,  countDistinct


(df
    .groupBy("location_point", window("eventtime", "5 minutes"))
    .agg( countDistinct("userid")))

You can add more arguments to modify slide duration.

You can try something similar with window functions if you partition by location:

windowSpec = (W.partitionBy(col("location"))
  .orderBy(col("eventtime").cast("timestamp").cast("long"))
  .rangeBetween(0, days(5)))


df.withColumn("id_count", countDistinct("userid").over(windowSpec))
like image 31
Alper t. Turker Avatar answered Sep 25 '22 07:09

Alper t. Turker