I am getting a data stream of the form:
+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1 |
|2 |12:15:25 | 30| 1 |
|3 |12:15:26 | 30| 2 |
|4 |12:15:27 | 50| 2 |
|5 |12:15:27 | 30| 3 |
|6 |12:15:27 | 60| 4 |
|7 |12:15:28 | 50| 5 |
|8 |12:15:30 | 60| 5 |
|9 |12:15:31 | 30| 6 |
|. |... |...|... |
I am interested in applying window operation to the xxx
column just like the window operation over timestamp is available in Spark Streaming with some window size and sliding step.
Let in the groupBy
with window function below, lines
represent a streaming dataframe with window size: 2 and sliding step: 1.
val c_windowed_count = lines.groupBy(
window($"xxx", "2", "1"), $"val").count().orderBy("xxx")
So, the output should be as follows:
+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 | 2 |
|[1, 3]|30 | 2 |
|[2, 4]|30 | 2 |
|[2, 4]|50 | 1 |
|[3, 5]|30 | 1 |
|[3, 5]|60 | 1 |
|[4, 6]|60 | 2 |
|[4, 6]|50 | 1 |
|... |.. | .. |
I tried using partitionBy
but it is not supported in Spark Structured Streaming.
I am using Spark Structured Streaming 2.3.1.
Thanks!
It's currently not possible to use windows on non-timestamp columns in this way using Spark Structured Streaming. However, what you can do is convert the xxx
column to a timestamp column, do the groupBy
and count
, and then transform back.
from_unixtime
can be used to convert the number of seconds since 1970-01-01 to a timestamp. Use the xxx
column as seconds and it's possible to create a fake timestamp to use in a window:
lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
.withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
.filter($"window.col1" =!= 0)
.orderBy($"window.col1")
Above, the grouping is done on the converted timestamp and the next row will convert it back to it's original number. The filter is done since the first two rows will be one a window [0,2]
(i.e. only on the rows with xxx
equals 1
) but can be skipped.
Resulting output of the above input:
+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50| 2|
| [1,3]| 30| 2|
| [2,4]| 30| 2|
| [2,4]| 50| 1|
| [3,5]| 30| 1|
| [3,5]| 60| 1|
| [4,6]| 60| 2|
| [4,6]| 50| 1|
| [5,7]| 30| 1|
| [5,7]| 60| 1|
| [5,7]| 50| 1|
| [6,8]| 30| 1|
+------+---+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With