I am running a Spark application in YARN-client mode with six executors (each four cores and executor memory = 6 GB and Overhead = 4 GB, Spark version: 1.6.3 / 2.1.0).
I find that my executor memory keeps increasing until getting killed by the node manager; and it gives out the information that tells me to boost spark.yarn.excutor.memoryOverhead
.
I know that this parameter mainly control the size of memory allocated off-heap. But I don’t know when and how the Spark engine will use this part of memory. Also increasing that part of memory does not always solve my problem. Sometimes it works and sometimes not. It trends to be useless when the input data is large.
FYI, my application’s logic is quite simple. It means to combine the small files generated in one single day (one directory one day) into a single one and write back to HDFS. Here is the core code:
val df = spark.read.parquet(originpath)
.filter(s"m = ${ts.month} AND d = ${ts.day}")
.coalesce(400)
val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
dropDF.repartition(1).write
.mode(SaveMode.ErrorIfExists)
.parquet(targetpath)
The source file may have hundreds to thousands level’s partition. And the total parquet file is around 1 to 5 GB.
Also I find that in the step that shuffle reading data from different machines, the size of shuffle read is about four times larger than the input size, Which is wired or some principle I don’t know.
Anyway, I have done some search myself for this problem. Some article said that it’s on the direct buffer memory (I don’t set myself).
Some article said that people solve it with more frequent full GC.
Also, I find one people on Stack Overflow with a very similar situation: Ever increasing physical memory for a Spark application in YARN
This guy claimed that it’s a bug with parquet, but a comment questioned him. People in this mail list may also receive an email hours ago from blondowski who described this problem while writing JSON: Executors - running out of memory
So it looks like to be common question for different output format.
I hope someone with experience about this problem could make an explanation about this issue. Why does this happen and what is a reliable way to solve this problem?
I just do some investigation in these days with my colleague. Here is my thought: from spark 1.2, we use Netty with off-heap memory to reduce GC during shuffle and cache block transfer. In my case, if I try to increase the memory overhead big enough. I will get the Max direct buffer exception. When Netty do block transferring, there will be five threads by default to grab the data chunk to target executor. In my situation, one single chunk is too big to fit into the buffer. So gc won’t help here. My final solution is to do another repartition before the repartition(1). Just to make 10x times more partitions than original’s. In this way, I can reduce the size of each chunk Netty transfer. In this way I finally make it.
Also I want to say that it’s not a good choice to repartition a big dataset into single file. This extremely unbalanced scenario is kind of waste your compute resources.
Welcome to any comment, I still don't understand this part well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With