I'm using Spark Streaming to read data from Kinesis using the Structured Streaming framework, my connection is as follows
val kinesis = spark
.readStream
.format("kinesis")
.option("streams", streamName)
.option("endpointUrl", endpointUrl)
.option("initialPositionInStream", "earliest")
.option("format", "json")
.schema(<my-schema>)
.load
The data comes from several IoT devices which have a unique id, I need to aggregate the data by this id and by a tumbling window over the timestamp field, as follows:
val aggregateData = kinesis
.groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
.agg(...)
The problem I'm encountering is that I need to guarantee that every window starts at round times (such as 00:00:00, 00:15:00 and so on), also I need a guarantee that only rows containing full 15-minute long windows are going to be output to my sink, what I'm currently doing is
val query = aggregateData
.writeStream
.foreach(postgreSQLWriter)
.outputMode("update")
.start()
.awaitTermination()
Where ths postgreSQLWriter is a StreamWriter I created for inserting each row into a PostgreSQL SGBD. How can I force my windows to be exactly 15-minute long and the start time to be round 15-minute timestamp values for each device unique id?
question1: to start at specific times to start, there is one more parameters spark grouping function takes which is "offset". By specifying that it will start after the specified time from an hour Example:
dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))
so the above syntax will group by column1 and create windows of 22 minute duration with sliding window size of 1 minute and offset as 15 minute
for example it starts from:
window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)
question2: to push only those windows having full 15 minute size, create a count column which counts the number of events having in that window. once it reaches 15, push it to wherever you want using filter command
calculating count:
dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))
writestream filter containing count 15 only:
aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With