Problem: Given a time series data which is a clickstream of user activity is stored in hive, ask is to enrich the data with session id using spark.
Session Definition
Data:
click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2
Below is partial solution considering only 1st point in session definition:
val win1 = Window.partitionBy("user_id").orderBy("click_time")
val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
userActivity
.withColumn("session_num",sum(sessionnew).over(win1))
.withColumn("session_id",concat($"user_id", $"session_num"))
.show(truncate = false)
Actual Output:
+---------------------+-------+-----------+----------+
|click_time |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1 |1 |u11 |
|2018-01-01 12:10:00.0|u1 |2 |u12 | -- session u12 starts
|2018-01-01 13:00:00.0|u1 |2 |u12 |
|2018-01-01 13:50:00.0|u1 |2 |u12 |
|2018-01-01 14:40:00.0|u1 |2 |u12 | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1 |2 |u12 |
|2018-01-01 16:20:00.0|u1 |2 |u12 |
|2018-01-01 16:50:00.0|u1 |2 |u12 | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2 |1 |u21 |
|2018-01-02 11:00:00.0|u2 |2 |u22 |
+---------------------+-------+-----------+----------+
To include the second condition, I tried to find difference between the current time with last session start time to check if that exceeds 2 hours, but however the reference itself changes for the following rows. These are some some use cases which can be achieved through running sum but this doesn’t suit here.
Not a straight forward problem to solve, but here's one approach:
lag
timestamp difference to identify sessions (with 0
= start of a session) per user for rule #1
rule #2
and create all session ids per userexplode
Sample code below:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val userActivity = Seq(
("2018-01-01 11:00:00", "u1"),
("2018-01-01 12:10:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 13:50:00", "u1"),
("2018-01-01 14:40:00", "u1"),
("2018-01-01 15:30:00", "u1"),
("2018-01-01 16:20:00", "u1"),
("2018-01-01 16:50:00", "u1"),
("2018-01-01 11:00:00", "u2"),
("2018-01-02 11:00:00", "u2")
).toDF("click_time", "user_id")
def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>
def sid(n: Long) = s"$uid-$n"
val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else
(sid(k) :: ls, j + i, k)
}._1.reverse
clickList zip sessList
}
Note that the accumulator for foldLeft
in the UDF is a Tuple of (ls, j, k)
, where:
ls
is the list of formatted session ids to be returnedj
and k
are for carrying over the conditionally changing timestamp value and session id number, respectively, to the next iterationStep 1
:
val tmo1: Long = 60 * 60
val tmo2: Long = 2 * 60 * 60
val win1 = Window.partitionBy("user_id").orderBy("click_time")
val df1 = userActivity.
withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(
lag($"click_time", 1).over(win1))
).
withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
otherwise($"ts_diff")
)
df1.show
// +-------------------+-------+-------+
// | click_time|user_id|ts_diff|
// +-------------------+-------+-------+
// |2018-01-01 11:00:00| u1| 0|
// |2018-01-01 12:10:00| u1| 0|
// |2018-01-01 13:00:00| u1| 3000|
// |2018-01-01 13:50:00| u1| 3000|
// |2018-01-01 14:40:00| u1| 3000|
// |2018-01-01 15:30:00| u1| 3000|
// |2018-01-01 16:20:00| u1| 3000|
// |2018-01-01 16:50:00| u1| 1800|
// |2018-01-01 11:00:00| u2| 0|
// |2018-01-02 11:00:00| u2| 0|
// +-------------------+-------+-------+
Steps 2
-4
:
val df2 = df1.
groupBy("user_id").agg(
collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")
).
withColumn("click_sess_id",
explode(clickSessList(tmo2)($"user_id", $"click_list", $"ts_list"))
).
select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))
df2.show
// +-------+-------------------+-------+
// |user_id|click_time |sess_id|
// +-------+-------------------+-------+
// |u1 |2018-01-01 11:00:00|u1-1 |
// |u1 |2018-01-01 12:10:00|u1-2 |
// |u1 |2018-01-01 13:00:00|u1-2 |
// |u1 |2018-01-01 13:50:00|u1-2 |
// |u1 |2018-01-01 14:40:00|u1-3 |
// |u1 |2018-01-01 15:30:00|u1-3 |
// |u1 |2018-01-01 16:20:00|u1-3 |
// |u1 |2018-01-01 16:50:00|u1-4 |
// |u2 |2018-01-01 11:00:00|u2-1 |
// |u2 |2018-01-02 11:00:00|u2-2 |
// +-------+-------------------+-------+
Also note that click_time
is "passed thru" in steps 2
-4
so as to be included in the final dataset.
Though the answer provided by Leo Works perfectly I feel its a complicated approach to Solve the problem by using Collect and Explode functions.This can be solved using Spark's Way by using UDAF to make it feasible for modifications in the near future as well.Please take a look into a solution on similar lines below
scala> //Importing Packages
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> // CREATE UDAF To Calculate total session duration Based on SessionIncativeFlag and Current Session Duration
scala> import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.MutableAggregationBuffer
scala> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala>
scala> class TotalSessionDuration extends UserDefinedAggregateFunction {
| // This is the input fields for your aggregate function.
| override def inputSchema: org.apache.spark.sql.types.StructType =
| StructType(
| StructField("sessiondur", LongType) :: StructField(
| "inactivityInd",
| IntegerType
| ) :: Nil
| )
|
| // This is the internal fields you keep for computing your aggregate.
| override def bufferSchema: StructType = StructType(
| StructField("sessionSum", LongType) :: Nil
| )
|
| // This is the output type of your aggregatation function.
| override def dataType: DataType = LongType
|
| override def deterministic: Boolean = true
|
| // This is the initial value for your buffer schema.
| override def initialize(buffer: MutableAggregationBuffer): Unit = {
| buffer(0) = 0L
| }
|
| // This is how to update your buffer schema given an input.
| override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
| if (input.getAs[Int](1) == 1)
| buffer(0) = 0L
| else if (buffer.getAs[Long](0) >= 7200L)
| buffer(0) = input.getAs[Long](0)
| else
| buffer(0) = buffer.getAs[Long](0) + input.getAs[Long](0)
| }
|
| // This is how to merge two objects with the bufferSchema type.
| override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
| if (buffer2.getAs[Int](1) == 1)
| buffer1(0) = 0L
| else if (buffer2.getAs[Long](0) >= 7200)
| buffer1(0) = buffer2.getAs[Long](0)
| else
| buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
| }
| // This is where you output the final value, given the final value of your bufferSchema.
| override def evaluate(buffer: Row): Any = {
| buffer.getLong(0)
| }
| }
defined class TotalSessionDuration
scala> //Create handle for using the UDAD Defined above
scala> val sessionSum=spark.udf.register("sessionSum", new TotalSessionDuration)
sessionSum: org.apache.spark.sql.expressions.UserDefinedAggregateFunction = TotalSessionDuration@64a9719a
scala> //Create Session Dataframe
scala> val clickstream = Seq(
| ("2018-01-01T11:00:00Z", "u1"),
| ("2018-01-01T12:10:00Z", "u1"),
| ("2018-01-01T13:00:00Z", "u1"),
| ("2018-01-01T13:50:00Z", "u1"),
| ("2018-01-01T14:40:00Z", "u1"),
| ("2018-01-01T15:30:00Z", "u1"),
| ("2018-01-01T16:20:00Z", "u1"),
| ("2018-01-01T16:50:00Z", "u1"),
| ("2018-01-01T11:00:00Z", "u2"),
| ("2018-01-02T11:00:00Z", "u2")
| ).toDF("timestamp", "userid").withColumn("curr_timestamp",unix_timestamp($"timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'").cast(TimestampType)).drop("timestamp")
clickstream: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp]
scala>
scala> clickstream.show(false)
+------+-------------------+
|userid|curr_timestamp |
+------+-------------------+
|u1 |2018-01-01 11:00:00|
|u1 |2018-01-01 12:10:00|
|u1 |2018-01-01 13:00:00|
|u1 |2018-01-01 13:50:00|
|u1 |2018-01-01 14:40:00|
|u1 |2018-01-01 15:30:00|
|u1 |2018-01-01 16:20:00|
|u1 |2018-01-01 16:50:00|
|u2 |2018-01-01 11:00:00|
|u2 |2018-01-02 11:00:00|
+------+-------------------+
scala> //Generate column SEF with values 0 or 1 depending on whether difference between current and previous activity time is greater than 1 hour=3600 sec
scala>
scala> //Window on Current Timestamp when last activity took place
scala> val windowOnTs = Window.partitionBy("userid").orderBy("curr_timestamp")
windowOnTs: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@41dabe47
scala> //Create Lag Expression to find previous timestamp for the User
scala> val lagOnTS = lag(col("curr_timestamp"), 1).over(windowOnTs)
lagOnTS: org.apache.spark.sql.Column = lag(curr_timestamp, 1, NULL) OVER (PARTITION BY userid ORDER BY curr_timestamp ASC NULLS FIRST unspecifiedframe$())
scala> //Compute Timestamp for previous activity and subtract the same from Timestamp for current activity to get difference between 2 activities
scala> val diff_secs_col = col("curr_timestamp").cast("long") - col("prev_timestamp").cast("long")
diff_secs_col: org.apache.spark.sql.Column = (CAST(curr_timestamp AS BIGINT) - CAST(prev_timestamp AS BIGINT))
scala> val UserActWindowed=clickstream.withColumn("prev_timestamp", lagOnTS).withColumn("last_session_activity_after", diff_secs_col ).na.fill(0, Array("last_session_activity_after"))
UserActWindowed: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp ... 2 more fields]
scala> //Generate Flag Column SEF (Session Expiry Flag) to indicate Session Has Expired due to inactivity for more than 1 hour
scala> val UserSessionFlagWhenInactive=UserActWindowed.withColumn("SEF",when(col("last_session_activity_after")>3600, 1).otherwise(0)).withColumn("tempsessid",sum(col("SEF")) over windowOnTs)
UserSessionFlagWhenInactive: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp ... 4 more fields]
scala> UserSessionFlagWhenInactive.show(false)
+------+-------------------+-------------------+---------------------------+---+----------+
|userid|curr_timestamp |prev_timestamp |last_session_activity_after|SEF|tempsessid|
+------+-------------------+-------------------+---------------------------+---+----------+
|u1 |2018-01-01 11:00:00|null |0 |0 |0 |
|u1 |2018-01-01 12:10:00|2018-01-01 11:00:00|4200 |1 |1 |
|u1 |2018-01-01 13:00:00|2018-01-01 12:10:00|3000 |0 |1 |
|u1 |2018-01-01 13:50:00|2018-01-01 13:00:00|3000 |0 |1 |
|u1 |2018-01-01 14:40:00|2018-01-01 13:50:00|3000 |0 |1 |
|u1 |2018-01-01 15:30:00|2018-01-01 14:40:00|3000 |0 |1 |
|u1 |2018-01-01 16:20:00|2018-01-01 15:30:00|3000 |0 |1 |
|u1 |2018-01-01 16:50:00|2018-01-01 16:20:00|1800 |0 |1 |
|u2 |2018-01-01 11:00:00|null |0 |0 |0 |
|u2 |2018-01-02 11:00:00|2018-01-01 11:00:00|86400 |1 |1 |
+------+-------------------+-------------------+---------------------------+---+----------+
scala> //Compute Total session duration using the UDAF TotalSessionDuration such that :
scala> //(i)counter will be rest to 0 if SEF is set to 1
scala> //(ii)or set it to current session duration if session exceeds 2 hours
scala> //(iii)If both of them are inapplicable accumulate the sum
scala> val UserSessionDur=UserSessionFlagWhenInactive.withColumn("sessionSum",sessionSum(col("last_session_activity_after"),col("SEF")) over windowOnTs)
UserSessionDur: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp ... 5 more fields]
scala> //Generate Session Marker if SEF is 1 or sessionSum Exceeds 2 hours(7200) seconds
scala> val UserNewSessionMarker=UserSessionDur.withColumn("SessionFlagChangeIndicator",when(col("SEF")===1 || col("sessionSum")>7200, 1).otherwise(0) )
UserNewSessionMarker: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp ... 6 more fields]
scala> //Create New Session ID based on the marker
scala> val computeSessionId=UserNewSessionMarker.drop("SEF","tempsessid","sessionSum").withColumn("sessid",concat(col("userid"),lit("-"),(sum(col("SessionFlagChangeIndicator")) over windowOnTs)+1.toLong))
computeSessionId: org.apache.spark.sql.DataFrame = [userid: string, curr_timestamp: timestamp ... 4 more fields]
scala> computeSessionId.show(false)
+------+-------------------+-------------------+---------------------------+--------------------------+------+
|userid|curr_timestamp |prev_timestamp |last_session_activity_after|SessionFlagChangeIndicator|sessid|
+------+-------------------+-------------------+---------------------------+--------------------------+------+
|u1 |2018-01-01 11:00:00|null |0 |0 |u1-1 |
|u1 |2018-01-01 12:10:00|2018-01-01 11:00:00|4200 |1 |u1-2 |
|u1 |2018-01-01 13:00:00|2018-01-01 12:10:00|3000 |0 |u1-2 |
|u1 |2018-01-01 13:50:00|2018-01-01 13:00:00|3000 |0 |u1-2 |
|u1 |2018-01-01 14:40:00|2018-01-01 13:50:00|3000 |1 |u1-3 |
|u1 |2018-01-01 15:30:00|2018-01-01 14:40:00|3000 |0 |u1-3 |
|u1 |2018-01-01 16:20:00|2018-01-01 15:30:00|3000 |0 |u1-3 |
|u1 |2018-01-01 16:50:00|2018-01-01 16:20:00|1800 |1 |u1-4 |
|u2 |2018-01-01 11:00:00|null |0 |0 |u2-1 |
|u2 |2018-01-02 11:00:00|2018-01-01 11:00:00|86400 |1 |u2-2 |
+------+-------------------+-------------------+---------------------------+--------------------------+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With