Suppose I have a DataFrame of events with time difference between each row, the main rule is that one visit is counted if only the event has been within 5 minutes of the previous or next event:
+--------+-------------------+--------+
|userid |eventtime |timeDiff|
+--------+-------------------+--------+
|37397e29|2017-06-04 03:00:00|60 |
|37397e29|2017-06-04 03:01:00|60 |
|37397e29|2017-06-04 03:02:00|60 |
|37397e29|2017-06-04 03:03:00|180 |
|37397e29|2017-06-04 03:06:00|60 |
|37397e29|2017-06-04 03:07:00|420 |
|37397e29|2017-06-04 03:14:00|60 |
|37397e29|2017-06-04 03:15:00|1140 |
|37397e29|2017-06-04 03:34:00|540 |
|37397e29|2017-06-04 03:53:00|540 |
+--------+----------------- -+--------+
The challenge is to group by the start_time and end_time of the latest eventtime that has the condition of being within 5 minutes. The output should be like this table:
+--------+-------------------+--------------------+-----------+
|userid |start_time |end_time |events |
+--------+-------------------+--------------------+-----------+
|37397e29|2017-06-04 03:00:00|2017-06-04 03:07:00 |6 |
|37397e29|2017-06-04 03:14:00|2017-06-04 03:15:00 |2 |
+--------+-------------------+--------------------+-----------+
So far I have used window lag functions and some conditions, however, I do not know where to go from here:
%spark.pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
windowSpec = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"])
windowSpecDesc = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"].desc())
# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm
nextEventTime = F.lag(col("eventtime"), -1).over(windowSpec)
# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm
previousEventTime = F.lag(col("eventtime"), 1).over(windowSpec)
diffEventTime = nextEventTime - col("eventtime")
nextTimeDiff = F.coalesce((F.unix_timestamp(nextEventTime)
- F.unix_timestamp('eventtime')), F.lit(0))
previousTimeDiff = F.coalesce((F.unix_timestamp('eventtime') -F.unix_timestamp(previousEventTime)), F.lit(0))
# Check if the next POI is the equal to the current POI and has a time differnce less than 5 minutes.
validation = F.coalesce(( (nextTimeDiff < 300) | (previousTimeDiff < 300) ), F.lit(False))
# Change True to 1
visitCheck = F.coalesce((validation == True).cast("int"), F.lit(1))
result_poi.withColumn("visit_check", visitCheck).withColumn("nextTimeDiff", nextTimeDiff).select("userid", "eventtime", "nextTimeDiff", "visit_check").orderBy("eventtime")
My questions: Is this a viable approach, and if so, how can I "go forward" and look at the maximum eventtime that fulfill the 5 minutes condition. To my knowledge, iterate through values of a Spark SQL Column, is it possible? wouldn't it be too expensive?. Is there another way to achieve this result?
Result of Solution suggested by @Aku:
+--------+--------+---------------------+---------------------+------+
|userid |subgroup|start_time |end_time |events|
+--------+--------+--------+------------+---------------------+------+
|37397e29|0 |2017-06-04 03:00:00.0|2017-06-04 03:06:00.0|5 |
|37397e29|1 |2017-06-04 03:07:00.0|2017-06-04 03:14:00.0|2 |
|37397e29|2 |2017-06-04 03:15:00.0|2017-06-04 03:15:00.0|1 |
|37397e29|3 |2017-06-04 03:34:00.0|2017-06-04 03:43:00.0|2 |
+------------------------------------+-----------------------+-------+
It doesn't give the result expected. 3:07 - 3:14 and 03:34-03:43 are being counted as ranges within 5 minutes, it shouldn't be like that. Also, 3:07 should be the end_time in the first row as it is within 5 minutes of the previous row 3:06.
You'll need one extra window function and a groupby
to achieve this.
What we want is for every line with timeDiff greater than 300 to be the end of a group and the start of a new one. Aku's solution should work, only the indicators mark the start of a group instead of the end. To change this you'll have to do a cumulative sum up to n-1 instead of n (n being your current line):
w = Window.partitionBy("userid").orderBy("eventtime")
DF = DF.withColumn("indicator", (DF.timeDiff > 300).cast("int"))
DF = DF.withColumn("subgroup", func.sum("indicator").over(w) - func.col("indicator"))
DF = DF.groupBy("subgroup").agg(
func.min("eventtime").alias("start_time"),
func.max("eventtime").alias("end_time"),
func.count("*").alias("events")
)
+--------+-------------------+-------------------+------+
|subgroup| start_time| end_time|events|
+--------+-------------------+-------------------+------+
| 0|2017-06-04 03:00:00|2017-06-04 03:07:00| 6|
| 1|2017-06-04 03:14:00|2017-06-04 03:15:00| 2|
| 2|2017-06-04 03:34:00|2017-06-04 03:34:00| 1|
| 3|2017-06-04 03:53:00|2017-06-04 03:53:00| 1|
+--------+-------------------+-------------------+------+
It seems that you also filter out lines with only one event, hence:
DF = DF.filter("events != 1")
+--------+-------------------+-------------------+------+
|subgroup| start_time| end_time|events|
+--------+-------------------+-------------------+------+
| 0|2017-06-04 03:00:00|2017-06-04 03:07:00| 6|
| 1|2017-06-04 03:14:00|2017-06-04 03:15:00| 2|
+--------+-------------------+-------------------+------+
So if I understand this correctly you essentially want to end each group when TimeDiff > 300? This seems relatively straightforward with rolling window functions:
First some imports
from pyspark.sql.window import Window
import pyspark.sql.functions as func
Then setting windows, I assumed you would partition by userid
w = Window.partitionBy("userid").orderBy("eventtime")
Then figuring out what subgroup each observation falls into, by first marking the first member of each group, then summing the column.
indicator = (TimeDiff > 300).cast("integer")
subgroup = func.sum(indicator).over(w).alias("subgroup")
Then some aggregation functions and you should be done
DF = DF.select("*", subgroup)\
.groupBy("subgroup")\
.agg(
func.min("eventtime").alias("start_time"),
func.max("eventtime").alias("end_time"),
func.count(func.lit(1)).alias("events")
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With