Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming Guarantee Specific Start Window Time

I'm using Spark Streaming to read data from Kinesis using the Structured Streaming framework, my connection is as follows

val kinesis = spark
  .readStream
  .format("kinesis")
  .option("streams", streamName)
  .option("endpointUrl", endpointUrl)
  .option("initialPositionInStream", "earliest")
  .option("format", "json")
  .schema(<my-schema>)
  .load

The data comes from several IoT devices which have a unique id, I need to aggregate the data by this id and by a tumbling window over the timestamp field, as follows:

val aggregateData = kinesis
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
    .agg(...)

The problem I'm encountering is that I need to guarantee that every window starts at round times (such as 00:00:00, 00:15:00 and so on), also I need a guarantee that only rows containing full 15-minute long windows are going to be output to my sink, what I'm currently doing is

val query = aggregateData
    .writeStream
      .foreach(postgreSQLWriter)
      .outputMode("update")
      .start()
      .awaitTermination()

Where ths postgreSQLWriter is a StreamWriter I created for inserting each row into a PostgreSQL SGBD. How can I force my windows to be exactly 15-minute long and the start time to be round 15-minute timestamp values for each device unique id?

like image 361
Eduardo Morgan Avatar asked Sep 20 '17 18:09

Eduardo Morgan


1 Answers

question1: to start at specific times to start, there is one more parameters spark grouping function takes which is "offset". By specifying that it will start after the specified time from an hour Example:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))

so the above syntax will group by column1 and create windows of 22 minute duration with sliding window size of 1 minute and offset as 15 minute

for example it starts from:

window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)

question2: to push only those windows having full 15 minute size, create a count column which counts the number of events having in that window. once it reaches 15, push it to wherever you want using filter command

calculating count:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))

writestream filter containing count 15 only:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()
like image 103
imran Avatar answered Nov 09 '22 07:11

imran