Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming mapWithState seems to rebuild complete state periodically

I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState() to keep track of seen data from previous batches.

The state is distributed in 20 partitions on multiple nodes, created with StateSpec.function(trackStateFunc _).numPartitions(20). In this state we have only a few keys (~100) mapped to Sets with up ~160.000 entries, which grow throughout the application. The entire state is up to 3GB, which can be handled by each node in the cluster. In each batch, some data is added to a state but not deleted until the very end of the process, i.e. ~15 minutes.

While following the application UI, every 10th batch's processing time is very high compared to the other batches. See images:

The spikes show the higher processing time.

The yellow fields represent the high processing time.

enter image description here

A more detailed Job view shows that in these batches occur at a certain point, exactly when all 20 partitions are "skipped". Or this is what the UI says.

enter image description here

My understanding of skipped is that each state partition is one possible task which isn't executed, as it doesn't need to be recomputed. However, I don't understand why the amount of skips varies in each Job and why the last Job requires so much processing. The higher processing time occurs regardless of the state's size, it just impacts the duration.

Is this a bug in the mapWithState() functionality or is this intended behaviour? Does the underlying data structure require some kind of reshuffling, does the Set in the state need to copy data? Or is it more likely to be a flaw in my application?

like image 339
Lawrence Benson Avatar asked Mar 16 '16 17:03

Lawrence Benson


People also ask

Does Spark Streaming programs typically run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

What is window interval in Spark Streaming?

Basically, any Spark window operation requires specifying two parameters. Window length – It defines the duration of the window (3 in the figure). Sliding interval – It defines the interval at which the window operation is performed (2 in the figure).

Is Spark Streaming stateful?

One of the most powerful features of Spark Streaming is the simple API for stateful stream processing and the associated native, fault-tolerant, state management.

How does Spark Streaming work under the hood?

It allows users to do complex processing like running machine learning and graph processing algorithms on streaming data. This is possible because Spark Streaming uses the Spark Processing Engine under the DStream API to process data. If implemented the right way, Spark streaming guarantees zero data loss.


1 Answers

Is this a bug in the mapWithState() functionality or is this intended behaviour?

This is intended behavior. The spikes you're seeing is because your data is getting checkpointed at the end of that given batch. If you'll notice the time on the longer batches, you'll see that it happens persistently every 100 seconds. That's because the checkpoint time is constant, and is calculated per your batchDuration, which is how often you talk to your data source to read a batch multiplied by some constant, unless you explicitly set the DStream.checkpoint interval.

Here is the relevant piece of code from MapWithStateDStream:

override def initialize(time: Time): Unit = {
  if (checkpointDuration == null) {
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
  }
  super.initialize(time)
}

Where DEFAULT_CHECKPOINT_DURATION_MULTIPLIER is:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

Which lines up exactly with the behavior you're seeing, since your read batch duration is every 10 seconds => 10 * 10 = 100 seconds.

This is normal, and that is the cost of persisting state with Spark. An optimization on your side could be to think how you can minimize the size of the state you have to keep in memory, in order for this serialization to be as quick as possible. Additionaly, make sure that the data is spread out throughout enough executors, so that state is distributed uniformly between all nodes. Also, I hope you've turned on Kryo Serialization instead of the default Java serialization, that can give you a meaningful performance boost.

like image 81
Yuval Itzchakov Avatar answered Nov 08 '22 22:11

Yuval Itzchakov