Based on our experiments we see that stateful Spark Streaming internal processing costs take significant amount of time when state becomes more than a million of objects. As a result latency suffers, because we have to increase batch interval to avoid unstable behavior (processing time > batch interval).
It has nothing to do with specifics of our app, since it can be reproduced by code below.
What are exactly those Spark internal processing/infrastructure costs that take it so much time to handle user state? Is there any options to decrease processing time besides of simply increasing batch interval?
We planned to use state extensively: at least 100MB or so on a each of a few nodes to keep all data in memory and only dump it once in hour.
Increasing batch interval helps, but we want to keep batch interval minimal.
The reason is probably not space occupied by state, but rather large object graph, because when we changed list to large array of primitives, the problem gone.
Just a guess: it might has something to do with org.apache.spark.util.SizeEstimator
used internally by Spark, because it shows up while profiling from time to time.
Here is simple demo to reproduce the picture above on modern iCore7:
Code:
package spark; import org.apache.commons.lang3.RandomStringUtils; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.util.SizeEstimator; import scala.Tuple2; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SlowSparkStreamingUpdateStateDemo { // Very simple state model static class State implements Serializable { final List<String> data; State(List<String> data) { this.data = data; } } public static void main(String[] args) { SparkConf conf = new SparkConf() // Tried KryoSerializer, but it does not seem to help much //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local[*]") .setAppName(SlowSparkStreamingUpdateStateDemo.class.getName()); JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(1)); javaStreamingContext.checkpoint("checkpoint"); // a must (if you have stateful operation) List<Tuple2<String, State>> initialRddGeneratedData = prepareInitialRddData(); System.out.println("Estimated size, bytes: " + SizeEstimator.estimate(initialRddGeneratedData)); JavaPairRDD<String, State> initialRdd = javaStreamingContext.sparkContext().parallelizePairs(initialRddGeneratedData); JavaPairDStream<String, State> stream = javaStreamingContext .textFileStream(".") // fake: effectively, no input at all .mapToPair(input -> (Tuple2<String, State>) null) // fake to get JavaPairDStream .updateStateByKey( (inputs, maybeState) -> maybeState, // simplest possible dummy function new HashPartitioner(javaStreamingContext.sparkContext().defaultParallelism()), initialRdd); // set generated state stream.foreachRDD(rdd -> { // simplest possible action (required by Spark) System.out.println("Is empty: " + rdd.isEmpty()); return null; }); javaStreamingContext.start(); javaStreamingContext.awaitTermination(); } private static List<Tuple2<String, State>> prepareInitialRddData() { // 'stateCount' tuples with value = list of size 'dataListSize' of strings of length 'elementDataSize' int stateCount = 1000; int dataListSize = 200; int elementDataSize = 10; List<Tuple2<String, State>> initialRddInput = new ArrayList<>(stateCount); for (int stateIdx = 0; stateIdx < stateCount; stateIdx++) { List<String> stateData = new ArrayList<>(dataListSize); for (int dataIdx = 0; dataIdx < dataListSize; dataIdx++) { stateData.add(RandomStringUtils.randomAlphanumeric(elementDataSize)); } initialRddInput.add(new Tuple2<>("state" + stateIdx, new State(stateData))); } return initialRddInput; } }
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
The purpose of the state store is to provide a reliable place from where the engine can read the intermediary result of Structured Streaming aggregations. Thanks to this place Spark can, even in the case of driver failure, recover the processing state to the point before the failure. In the analyzed version (2.2.
Another advantage of using a big data processing framework like Apache Spark is that we can combine batch processing and streaming processing in the same system. We can also apply Spark's machine learning and graph processing algorithms on data streams.
State management has been improved in spark 1.6.
please refer to SPARK-2629 Improved state management for Spark Streaming;
And in the detailed design spec:
Improved state management in Spark Streaming
One performance drawback is metioned as below:
Need for more optimized state management that does not scan every key Current updateStateByKey scan every key in every batch interval, even if there is no data for that key. While this semantics is useful is some workloads, most workloads require only ``scanning and updating the state for which there is new data. And only a small percentage of all the state needs to be touched for that in every batch interval. The cogroup-based implementation of updateStateByKey is not designed for this; cogroup scans all the keys every time. In fact, this causes the batch processing times of updateStateByKey to increase with the number of keys in the state, even if the data rate stays fixed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With