Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Spark fail with java.lang.OutOfMemoryError: GC overhead limit exceeded?

I'm trying to implement a Hadoop Map/Reduce job that worked fine before in Spark. The Spark app definition is the following:

val data = spark.textFile(file, 2).cache() val result = data   .map(//some pre-processing)   .map(docWeightPar => (docWeightPar(0),docWeightPar(1))))   .flatMap(line => MyFunctions.combine(line))   .reduceByKey( _ + _) 

Where MyFunctions.combine is

def combine(tuples: Array[(String, String)]): IndexedSeq[(String,Double)] =   for (i <- 0 to tuples.length - 2;        j <- 1 to tuples.length - 1   ) yield (toKey(tuples(i)._1,tuples(j)._1),tuples(i)._2.toDouble * tuples(j)._2.toDouble) 

The combine function produces lots of map keys if the list used for input is big and this is where the exceptions is thrown.

In the Hadoop Map Reduce setting I didn't have problems because this is the point where the combine function yields was the point Hadoop wrote the map pairs to disk. Spark seems to keep all in memory until it explodes with a java.lang.OutOfMemoryError: GC overhead limit exceeded.

I am probably doing something really basic wrong but I couldn't find any pointers on how to come forward from this, I would like to know how I can avoid this. Since I am a total noob at Scala and Spark I am not sure if the problem is from one or from the other, or both. I am currently trying to run this program in my own laptop, and it works for inputs where the length of the tuples array is not very long. Thanks in advance.

like image 268
Augusto Avatar asked Dec 13 '14 18:12

Augusto


People also ask

What causes GC overhead limit exceeded?

OutOfMemoryError: GC overhead limit exceeded" error indicates that the NameNode heap size is insufficient for the amount of HDFS data in the cluster. Increase the heap size to prevent out-of-memory exceptions.

How do I reduce the GC time on my spark?

To avoid full GC in G1 GC, there are two commonly-used approaches: Decrease the InitiatingHeapOccupancyPercent option's value (the default value is 45), to let G1 GC starts initial concurrent marking at an earlier time, so that we are more likely to avoid full GC.

How do you fix heap memory in spark?

You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.


2 Answers

Add the following JVM arg when you launch spark-shell or spark-submit:

-Dspark.executor.memory=6g 

You may also consider to explicitly set the number of workers when you create an instance of SparkContext:

Distributed Cluster

Set the slave names in the conf/slaves:

val sc = new SparkContext("master", "MyApp") 
like image 196
WestCoastProjects Avatar answered Sep 20 '22 00:09

WestCoastProjects


In the documentation (http://spark.apache.org/docs/latest/running-on-yarn.html) you can read how to configure the executors and the memory limit. For example:

--master yarn-cluster --num-executors 10 --executor-cores 3 --executor-memory 4g --driver-memory 5g  --conf spark.yarn.executor.memoryOverhead=409 

The memoryOverhead should be the 10% of the executor memory.

Edit: Fixed 4096 to 409 (Comment below refers to this)

like image 22
Carlos AG Avatar answered Sep 19 '22 00:09

Carlos AG