I have been using the mapWithState
API in Spark Streaming, but 2 things are not clear about the StateSpec.function
:
Let's say my function is:
def trackStateForKey(batchTime: Time,
key: Long,
newValue: Option[JobData],
currentState: State[JobData]): Option[(Long, JobData)]
Why is the new value an Option[T]
type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.
What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove()
and state.update()
, why would I have to do the same with return values?
In my current implementation I return None
if I remove the key, and Some(newState)
if I update it, but I'm not sure if that is correct.
To use mapWithState we must pass stateful function to function method of StateSpec object. The object represents state specification. It wraps stateful function but also defines timeout, partitioner or initial state. Underlyed DStream is represented by org.
Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.
Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.
One of the most powerful features of Spark Streaming is the simple API for stateful stream processing and the associated native, fault-tolerant, state management.
Why is the new value an
Option[T]
type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.
It is an Option[T]
for the reason that if you set a timeout using StateSpec.timeout
, e.g:
StateSpec.function(spec _).timeout(Milliseconds(5000))
then the value passed in once the function times out will be None
and the isTimingOut
method on State[T]
will yield true. This makes sense, because a timeout of the state doesn't mean that a new value has arrived for the specified key, and generally safer to use than passing null
for T
(which wouldn't work for primitives anyway) as you expect the user to safely operate on an Option[T]
.
You can see that in the Sparks implementation:
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
mappedData ++= returned
newStateMap.remove(key)
}
}
What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?
The return value is a way to pass intermediate state along the spark graph. For example, assume that I want to update my state but also perform some operation in my pipeline with the intermediate data, e.g:
dStream
.mapWithState(stateSpec)
.map(optionIntermediateResult.map(_ * 2))
.foreachRDD( /* other stuff */)
That return value is exactly what allows me to continue operating on said data. If you don't care for the intermediate result and only want the complete state, then outputting None
is perfectly fine.
I've written a blog post (following this question) which attempts to give an in-depth explanation to the API.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With