What would be some considerations for choosing stateless sliding-window operations (e.g. reduceByKeyAndWindow) vs. choosing to keep state (e.g. via updateStateByKey or the new mapStateByKey) when handling a stream of sequential, finite event sessions with Spark Streaming?
For example, consider the following scenario:
A wearable device tracks physical exercises performed by the wearer. The device automatically detects when an exercise starts, and emits a message; emits additional messages while the exercise is undergoing (e.g. heart rate); and finally, emits a message when the exercise is done.
The desired result is a stream of aggregated records per exercise session. i.e. all events of the same session should be aggregated together (e.g. so that each session could be saved in a single DB row). Note that each session has a finite length, but the entire stream from multiple devices is continuous. For convenience, let's assume the device generates a GUID for each exercise session.
I can see two approaches for handling this use-case with Spark Streaming:
Using non-overlapping windows, and keeping state. A state is saved per GUID, with all events matching it. When a new event arrives, the state is updated (e.g. using mapWithState), and in case the event is "end of exercise session", an aggregated record based on the state will be emitted, and the key removed.
Using overlapping sliding windows, and keeping only the first sessions. Assume a sliding window of length 2 and interval 1 (see diagram below). Also assume that the window length is 2 X (maximal possible exercise time). On each window, events are aggreated by GUID, e.g. using reduceByKeyAndWindow. Then, all sessions which started at the second half of the window are dumped, and the remaining sessions emitted. This enables using each event exactly once, and ensures all events belonging to the same session will be aggregated together.
Diagram for approach #2:
Only sessions starting in the areas marked with \\\ will be emitted. ----------- |window 1 | |\\\\| | ----------- ---------- |window 2 | |\\\\| | ----------- ---------- |window 3 | |\\\\| | -----------
Pros and cons I see:
Approach #1 is less computationally expensive, but requires saving and managing state (e.g. if the number of concurrent sessions increases, the state might get larger than memory). However if the maximal number of concurrent sessions is bounded, this might not be an issue.
Approach #2 is twice as expensive (each event is processed twice), and with higher latency (2 X maximal exercise time), but more simple and easily manageable, as no state is retained.
What would be the best way to handle this use case - is any of these approaches the "right" one, or are there better ways?
What other pros/cons should be taken into consideration?
Stateful Streaming in SparkApache Spark is a fast and general-purpose cluster computing system. In Spark, we can do the batch processing and stream processing as well. It does near real-time processing. It means that it processes the data in micro-batches.
The purpose of the state store is to provide a reliable place from where the engine can read the intermediary result of Structured Streaming aggregations. Thanks to this place Spark can, even in the case of driver failure, recover the processing state to the point before the failure. In the analyzed version (2.2.
The simplest windowing function is a window, which lets you create a new DStream, computed by applying the windowing parameters to the old DStream. You can use any of the DStream operations on the new stream, so you get all the flexibility you want.
One of the most powerful features of Spark Streaming is the simple API for stateful stream processing and the associated native, fault-tolerant, state management.
Normally there is no right approach, each has tradeoffs. Therefore I'd add additional approach to the mix and will outline my take on their pros and cons. So you can decide which one is more suitable for you.
You can accumulate state of the events in external storage. Cassandra is quite often used for that. You can handle final and ongoing events separately for example like below:
val stream = ... val ongoingEventsStream = stream.filter(!isFinalEvent) val finalEventsStream = stream.filter(isFinalEvent) ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ } finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }
It might be potentially optimal solution for you as it removes drawbacks of updateStateByKey, but considering it is just got released as part of Spark 1.6 release, it could be risky as well (since for some reason it is not very advertised). You can use the link as starting point if you want to find out more
While it is possible to achieve what you need with windows, it looks significantly less natural in your scenario.
I'd try the following:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With