Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Strucutured Streaming Window on non-timestamp column

I am getting a data stream of the form:

+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1  |
|2 |12:15:25 | 30| 1  |
|3 |12:15:26 | 30| 2  |
|4 |12:15:27 | 50| 2  |
|5 |12:15:27 | 30| 3  |
|6 |12:15:27 | 60| 4  |
|7 |12:15:28 | 50| 5  |
|8 |12:15:30 | 60| 5  |
|9 |12:15:31 | 30| 6  |
|. |...      |...|... |

I am interested in applying window operation to the xxx column just like the window operation over timestamp is available in Spark Streaming with some window size and sliding step.

Let in the groupBy with window function below, lines represent a streaming dataframe with window size: 2 and sliding step: 1.

val c_windowed_count = lines.groupBy(
  window($"xxx", "2", "1"), $"val").count().orderBy("xxx")

So, the output should be as follows:

+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 |  2  |
|[1, 3]|30 |  2  |
|[2, 4]|30 |  2  |
|[2, 4]|50 |  1  |
|[3, 5]|30 |  1  |
|[3, 5]|60 |  1  |
|[4, 6]|60 |  2  |
|[4, 6]|50 |  1  |
|...   |.. | ..  |

I tried using partitionBy but it is not supported in Spark Structured Streaming.

I am using Spark Structured Streaming 2.3.1.

Thanks!

like image 475
shaikh Avatar asked Sep 12 '18 07:09

shaikh


Video Answer


1 Answers

It's currently not possible to use windows on non-timestamp columns in this way using Spark Structured Streaming. However, what you can do is convert the xxx column to a timestamp column, do the groupBy and count, and then transform back.

from_unixtime can be used to convert the number of seconds since 1970-01-01 to a timestamp. Use the xxx column as seconds and it's possible to create a fake timestamp to use in a window:

lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
  .withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
  .filter($"window.col1" =!= 0)
  .orderBy($"window.col1")

Above, the grouping is done on the converted timestamp and the next row will convert it back to it's original number. The filter is done since the first two rows will be one a window [0,2] (i.e. only on the rows with xxx equals 1) but can be skipped.

Resulting output of the above input:

+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50|    2|
| [1,3]| 30|    2|
| [2,4]| 30|    2|
| [2,4]| 50|    1|
| [3,5]| 30|    1|
| [3,5]| 60|    1|
| [4,6]| 60|    2|
| [4,6]| 50|    1|
| [5,7]| 30|    1|
| [5,7]| 60|    1|
| [5,7]| 50|    1|
| [6,8]| 30|    1|
+------+---+-----+
like image 165
Shaido Avatar answered Sep 25 '22 02:09

Shaido