Window of Windowed Dstream does not work in Spark Streaming. It seems a scheduler bug inside Spark Streaming.
val layer0= // Input data
val layer1 = layer0.window(Seconds(30), Seconds(30)) // Works
layer1.foreachRDD(...)
val layer2 = layer1.window(Seconds(60), Seconds(60)) // Does not work
layer2.foreachRDD(...)
Anyone met this issue, and get to know how to fix this in Spark.
Add more detail from the driver log:
Time 1433141250000 :
2015-06-01 06:47:30 INFO MapValuedDStream - Time 1433141250000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 10000 ms
2015-06-01 06:47:30 INFO MapValuedDStream - Time 1433141250000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 60000 ms and difference is 10000 ms
Time 1433141260000 :
2015-06-01 06:47:40 INFO MapValuedDStream - Time 1433141260000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 20000 ms
2015-06-01 06:47:40 INFO MapValuedDStream - Time 1433141260000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 60000 ms and difference is 20000 ms
Time 1433141270000 : (30S)
2015-06-01 06:47:50 INFO FilteredDStream - Slicing from 1433141250000 ms to 1433141270000 ms (aligned to 1433141250000 ms and 1433141270000 ms)
2015-06-01 06:47:50 INFO MapValuedDStream - Time 1433141270000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 60000 ms and difference is 30000 ms
Time 1433141280000 : 2015-06-01 06:48:00 INFO MapValuedDStream - Time 1433141280000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 40000 ms
2015-06-01 06:48:00 INFO MapValuedDStream - Time 1433141280000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 60000 ms and difference is 40000 ms
Time 1433141290000 :
2015-06-01 06:48:10 INFO MapValuedDStream - Time 1433141290000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 50000 ms
2015-06-01 06:48:10 INFO MapValuedDStream - Time 1433141290000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 60000 ms and difference is 50000 ms
Time 1433141300000 : (60S)
2015-06-01 06:48:20 INFO WindowedDStream - Slicing from 1433141270000 ms to 1433141300000 ms (aligned to 1433141250000 ms and 1433141280000 ms)
2015-06-01 06:48:20 INFO WindowedDStream - Time 1433141250000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 10000 ms
2015-06-01 06:48:20 INFO WindowedDStream - Time 1433141280000 ms is invalid as zeroTime is 1433141240000 ms and slideDuration is 30000 ms and difference is 40000 ms
reduceByKeyAndWindow(reduceFunc, new Duration(30000), new Duration(10000)); Some of the common window-based operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval. Return a new DStream which is computed based on windowed batches of the source DStream.
sliding interval - is amount of time in seconds for how much the window will shift. For example in previous example sliding interval is 1 (since calculation is kicked out each second) e.g. at time=1, time=2, time=3... if you set sliding interval=2, you will get calculation at time=1, time=3, time=5...
Spark Streaming: Window The simplest windowing function is a window, which lets you create a new DStream, computed by applying the windowing parameters to the old DStream. You can use any of the DStream operations on the new stream, so you get all the flexibility you want.
Spark Streaming uses readStream to monitors the folder and process files that arrive in the directory real-time and uses writeStream to write DataFrame or Dataset. Spark Streaming is a scalable, high-throughput, fault-tolerant streaming processing system that supports both batch and streaming workloads.
It is indeed a bug and I've logged it as SPARK-7326. I've also fixed it myself. See my pull request, which has been merged into master. I believe the fix will be in 1.4.0 release.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With