Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to group by time interval in Spark SQL

My dataset looks like this:

KEY |Event_Type | metric | Time 
001 |event1     | 10     | 2016-05-01 10:50:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

I want to get all when the keys that verify this:

"SUM of metric for a specific event" > threshold during 5 minutes .

This appear to me a perfect candidate for using the Sliding Windows Functions .

How can I do this with Spark SQL ?

Thank you.

like image 623
Nabil Avatar asked Jun 04 '16 15:06

Nabil


People also ask

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.

What is AGG in Spark SQL?

agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .

What is Rlike in Spark SQL?

The Spark rlike method allows you to write powerful string matching algorithms with regular expressions (regexp).

How time difference is calculated in SQL Spark?

Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...


1 Answers

Spark >= 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")  // +---+---------------------------------------------+-----------+ // |KEY|window                                       |sum(metric)| // +---+---------------------------------------------+-----------+ // |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         | // |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         | // |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         | // |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         | // |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        | // +---+---------------------------------------------+-----------+ 

Spark < 2.0

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0  val df = Seq(   ("001", "event1", 10, "2016-05-01 10:50:51"),   ("002", "event2", 100, "2016-05-01 10:50:53"),   ("001", "event3", 20, "2016-05-01 10:50:55"),   ("001", "event1", 15, "2016-05-01 10:51:50"),   ("003", "event1", 13, "2016-05-01 10:55:30"),   ("001", "event2", 12, "2016-05-01 10:57:00"),   ("001", "event3", 11, "2016-05-01 11:00:01") ).toDF("KEY", "Event_Type", "metric", "Time") 

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}  // cast string to timestamp_seconds val ts = $"Time".cast("timestamp").cast("long")  // Round to 300 seconds interval // In Spark >= 3.1 replace cast("timestamp") with  val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")  df.groupBy($"KEY", interval).sum("metric")  // +---+---------------------+-----------+ // |KEY|interval             |sum(metric)| // +---+---------------------+-----------+ // |001|2016-05-01 11:00:00.0|11         | // |001|2016-05-01 10:55:00.0|12         | // |001|2016-05-01 10:50:00.0|45         | // |003|2016-05-01 10:55:00.0|13         | // |002|2016-05-01 10:50:00.0|100        | // +---+---------------------+-----------+ 

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window  // Partition by KEY // Order by timestamp  // Consider window of -150 seconds to + 150 seconds relative to the current row val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150) df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))  // +---+----------+------+-------------------+----------+----------+ // |KEY|Event_Type|metric|Time               |ts        |window_sum| // +---+----------+------+-------------------+----------+----------+ // |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        | // |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        | // |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        | // |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        | // |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        | // |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        | // |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       | // +---+----------+------+-------------------+----------+----------+ 

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

like image 168
zero323 Avatar answered Sep 17 '22 02:09

zero323