Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: Why internal processing costs are so high to handle user state of a few MB?

Tags:

Based on our experiments we see that stateful Spark Streaming internal processing costs take significant amount of time when state becomes more than a million of objects. As a result latency suffers, because we have to increase batch interval to avoid unstable behavior (processing time > batch interval).

It has nothing to do with specifics of our app, since it can be reproduced by code below.

What are exactly those Spark internal processing/infrastructure costs that take it so much time to handle user state? Is there any options to decrease processing time besides of simply increasing batch interval?

We planned to use state extensively: at least 100MB or so on a each of a few nodes to keep all data in memory and only dump it once in hour.

Increasing batch interval helps, but we want to keep batch interval minimal.

The reason is probably not space occupied by state, but rather large object graph, because when we changed list to large array of primitives, the problem gone.

Just a guess: it might has something to do with org.apache.spark.util.SizeEstimator used internally by Spark, because it shows up while profiling from time to time.

enter image description here

Here is simple demo to reproduce the picture above on modern iCore7:

  • less than 15 MB of state
  • no stream input at all
  • quickest possible (dummy) 'updateStateByKey' function
  • batch interval 1 second
  • checkpoint (required by Spark, must have) to local disk
  • tested both locally and on YARN

Code:

package spark;  import org.apache.commons.lang3.RandomStringUtils; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.util.SizeEstimator; import scala.Tuple2;  import java.io.Serializable; import java.util.ArrayList; import java.util.List;  public class SlowSparkStreamingUpdateStateDemo {      // Very simple state model     static class State implements Serializable {         final List<String> data;         State(List<String> data) {             this.data = data;         }     }      public static void main(String[] args) {         SparkConf conf = new SparkConf()                 // Tried KryoSerializer, but it does not seem to help much                 //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")                 .setMaster("local[*]")                 .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName());          JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1));         javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation)          List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData();         System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData));         JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData);          JavaPairDStream<String, State> stream = javaStreamingContext                 .textFileStream(".") // fake: effectively, no input at all                 .mapToPair(input -> (Tuple2<String, State>) null) //  fake to get JavaPairDStream                 .updateStateByKey(                         (inputs, maybeState) -> maybeState, // simplest possible dummy function                         new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()),                         initialRdd); // set generated state          stream.foreachRDD(rdd -> { // simplest possible action (required by Spark)             System.out.println("Is empty: " + rdd.isEmpty());             return null;         });          javaStreamingContext.start();         javaStreamingContext.awaitTermination();     }      private static List<Tuple2<String, State>> prepareInitialRddData() {         // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize'         int stateCount = 1000;         int dataListSize = 200;         int elementDataSize = 10;         List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount);         for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) {             List<String> stateData = new ArrayList<>(dataListSize);             for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) {                 stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize));             }             initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData)));         }         return initialRddInput;     }  } 
like image 478
Andrii Rubtsov Avatar asked Sep 10 '15 12:09

Andrii Rubtsov


People also ask

How does Spark Streaming work internally?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

Why does state information need to be stored in Spark Streaming?

The purpose of the state store is to provide a reliable place from where the engine can read the intermediary result of Structured Streaming aggregations. Thanks to this place Spark can, even in the case of driver failure, recover the processing state to the point before the failure. In the analyzed version (2.2.

Can Apache spark handle stream processing?

Another advantage of using a big data processing framework like Apache Spark is that we can combine batch processing and streaming processing in the same system. We can also apply Spark's machine learning and graph processing algorithms on data streams.


1 Answers

State management has been improved in spark 1.6.
please refer to SPARK-2629 Improved state management for Spark Streaming;

And in the detailed design spec:
Improved state management in Spark Streaming

One performance drawback is metioned as below:

Need for more optimized state management that does not scan every key Current updateStateByKey scan every key in every batch interval, even if there is no data for that key. While this semantics is useful is some workloads, most workloads require only ``scanning and updating the state for which there is new data. And only a small percentage of all the state needs to be touched for that in every batch interval. The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed. enter image description here

like image 181
Shawn Guo Avatar answered Oct 02 '22 16:10

Shawn Guo