Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is proper event-time sessionization possible with Spark Structured Streaming?

Been playing around Spark Structured Streaming and mapGroupsWithState (specifically following the StructuredSessionization example in the Spark source). I want to confirm some limitations I believe exist with mapGroupsWithState given my use case.

A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common).

An example will help before jumping into code:

{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}

For the stream above, a session is defined with a 30 minute period of inactivity. In a streaming context, we should end up with one session (the second has yet to complete):

[
  {
    "user_id": "mike",
    "startTimestamp": "2018-01-01T00:00:00",
    "endTimestamp": "2018-01-01T00:05:00"
  }
]

Now consider the following Spark driver program:

import java.sql.Timestamp

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

object StructuredSessionizationV2 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("StructredSessionizationRedux")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    implicit val ctx = spark.sqlContext
    val input = MemoryStream[String]

    val EVENT_SCHEMA = new StructType()
      .add($"event_time".string)
      .add($"user_id".string)

    val events = input.toDS()
      .select(from_json($"value", EVENT_SCHEMA).alias("json"))
      .select($"json.*")
      .withColumn("event_time", to_timestamp($"event_time"))
      .withWatermark("event_time", "1 hours")
    events.printSchema()

    val sessionized = events
      .groupByKey(row => row.getAs[String]("user_id"))
      .mapGroupsWithState[SessionState, SessionOutput](GroupStateTimeout.EventTimeTimeout) {
      case (userId: String, events: Iterator[Row], state: GroupState[SessionState]) =>
        println(s"state update for user ${userId} (current watermark: ${new Timestamp(state.getCurrentWatermarkMs())})")
        if (state.hasTimedOut) {
          println(s"User ${userId} has timed out, sending final output.")
          val finalOutput = SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = true
          )
          // Drop this user's state
          state.remove()
          finalOutput
        } else {
          val timestamps = events.map(_.getAs[Timestamp]("event_time").getTime).toSeq
          println(s"User ${userId} has new events (min: ${new Timestamp(timestamps.min)}, max: ${new Timestamp(timestamps.max)}).")
          val newState = if (state.exists) {
            println(s"User ${userId} has existing state.")
            val oldState = state.get
            SessionState(
              startTimestampMs = math.min(oldState.startTimestampMs, timestamps.min),
              endTimestampMs = math.max(oldState.endTimestampMs, timestamps.max)
            )
          } else {
            println(s"User ${userId} has no existing state.")
            SessionState(
              startTimestampMs = timestamps.min,
              endTimestampMs = timestamps.max
            )
          }
          state.update(newState)
          state.setTimeoutTimestamp(newState.endTimestampMs, "30 minutes")
          println(s"User ${userId} state updated. Timeout now set to ${new Timestamp(newState.endTimestampMs + (30 * 60 * 1000))}")
          SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = false
          )
        }
      }

    val eventsQuery = sessionized
      .writeStream
      .queryName("events")
      .outputMode("update")
      .format("console")
      .start()

    input.addData(
      """{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}"""
    )
    input.addData(
      """{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}"""
    )
    eventsQuery.processAllAvailable()
  }

  case class SessionState(startTimestampMs: Long, endTimestampMs: Long) {
    def durationMs: Long = endTimestampMs - startTimestampMs
  }

  case class SessionOutput(userId: String, startTimestampMs: Long, endTimestampMs: Long, durationMs: Long, expired: Boolean)
}

Output of that program is:

root
 |-- event_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)

state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514783100000|    300000|  false|
+------+----------------+--------------+----------+-------+

state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514785500000|   2700000|  false|
+------+----------------+--------------+----------+-------+

Given my session definition, the single event in the second batch should trigger an expiry of session state and thus a new session. However, since the watermark (2017-12-31 23:05:00.0) has not passed the state's timeout (2018-01-01 00:35:00.0), state isn't expired and the event is erroneously added to the existing session despite the fact that more than 30 minutes have passed since the latest timestamp in the previous batch.

I think the only way for session state expiration to work as I'm hoping is if enough events from different users were received within the batch to advance the watermark past the state timeout for mike.

I suppose one could also mess with the stream's watermark, but I can't think of how I'd do that to accomplish my use case.

Is this accurate? Am I missing anything in how to properly do event time-based sessionization in Spark?

like image 752
Mike Sukmanowsky Avatar asked Aug 12 '18 15:08

Mike Sukmanowsky


1 Answers

The implementation you have provided does not seem to work if the watermark interval is greater than session gap duration.

For the logic you have shown to work, you need to set the watermark interval to < 30 mins.

If you really want the watermark interval to be independent of (or more than) the session gap duration, you need to wait until the watermark passes (watermark + gap) to expire the state. The merging logic seems to blindly merge the windows. This should take the gap duration into account before merging.

like image 144
arunmahadevan Avatar answered Sep 24 '22 01:09

arunmahadevan