Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory issue with spark structured streaming

I'm facing memory issues running structured stream with aggregation and partitioning in Spark 2.2.0:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

During testing I noticed that amount of used memory increases each time when new data comes and finally executors exit with code 137:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

I've created a heap dump and found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore

On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it is.

executor memory usage

Moreover, GC time took more than 30% of total processing time

enter image description here

Here is a heap dump taken from the executor running with smaller amount of memory than on screens above since when I was creating a dump from that one the java process just terminated in the middle of the process.

enter image description here

like image 993
Yuriy Bondaruk Avatar asked Mar 11 '18 00:03

Yuriy Bondaruk


People also ask

How do I fix a Spark memory problem?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.

What is memory leak in Spark?

Memory leak in the application and application master gets killed and runs again after running out of the XmX and eventually gets killed. The spark application as the RSS memory of the process keeps on growing very slowly and gets killed by NM eventually.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How does Spark use memory?

Memory usage in Spark largely falls under one of two categories: execution and storage. Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster.


2 Answers

Migrating my comment on SPARK-23682 which asker of this question also filed in issue.

In HDFS state store provider, it excessively caches the multiple versions of states in memory, default 100 versions. The issue is addressed by SPARK-24717, and it will only maintain two versions (current for replay, and new for update) of state in memory. The patch will be available in Spark 2.4.0.

like image 63
Jungtaek Lim Avatar answered Oct 03 '22 17:10

Jungtaek Lim


I think the root reason is that you do not use a watermark along with dropDuplicates, thus all the states are kept and never dropped. Have a look at: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

like image 42
Kevin Avatar answered Oct 03 '22 17:10

Kevin