Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark mapWithState API explanation

I have been using the mapWithState API in Spark Streaming, but 2 things are not clear about the StateSpec.function:

Let's say my function is:

def trackStateForKey(batchTime: Time,
                     key: Long,
                     newValue: Option[JobData],
                     currentState: State[JobData]): Option[(Long, JobData)]
  1. Why is the new value an Option[T] type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

  2. What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?

    In my current implementation I return None if I remove the key, and Some(newState) if I update it, but I'm not sure if that is correct.

like image 673
Daniel Zolnai Avatar asked Jul 15 '16 13:07

Daniel Zolnai


People also ask

How do I use mapWithState?

To use mapWithState we must pass stateful function to function method of StateSpec object. The object represents state specification. It wraps stateful function but also defines timeout, partitioner or initial state. Underlyed DStream is represented by org.

How does Spark read streaming data?

Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.

What is Spark streaming API?

Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.

Is Spark streaming stateful?

One of the most powerful features of Spark Streaming is the simple API for stateful stream processing and the associated native, fault-tolerant, state management.


1 Answers

Why is the new value an Option[T] type? As far as I've seen, it was always defined for me, and since the method is supposed to be called with a new state, I don't really see the point why it could be optional.

It is an Option[T] for the reason that if you set a timeout using StateSpec.timeout, e.g:

StateSpec.function(spec _).timeout(Milliseconds(5000))

then the value passed in once the function times out will be None and the isTimingOut method on State[T] will yield true. This makes sense, because a timeout of the state doesn't mean that a new value has arrived for the specified key, and generally safer to use than passing null for T (which wouldn't work for primitives anyway) as you expect the user to safely operate on an Option[T].

You can see that in the Sparks implementation:

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
  newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
    wrappedState.wrapTimingOutState(state)
    val returned = mappingFunction(batchTime, key, None, wrappedState) // <-- This.
    mappedData ++= returned
    newStateMap.remove(key)
  }
}

What does the return value mean? I tried to find some pointers in the documentations and source code, but none of them describe what it is used for. Since I'm modifying the state of a key using state.remove() and state.update(), why would I have to do the same with return values?

The return value is a way to pass intermediate state along the spark graph. For example, assume that I want to update my state but also perform some operation in my pipeline with the intermediate data, e.g:

dStream
  .mapWithState(stateSpec)
  .map(optionIntermediateResult.map(_ * 2))
  .foreachRDD( /* other stuff */)

That return value is exactly what allows me to continue operating on said data. If you don't care for the intermediate result and only want the complete state, then outputting None is perfectly fine.

Edit:

I've written a blog post (following this question) which attempts to give an in-depth explanation to the API.

like image 151
Yuval Itzchakov Avatar answered Sep 29 '22 06:09

Yuval Itzchakov