Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FetchFailedException or MetadataFetchFailedException when processing big data set

When I run the parsing code with 1 GB dataset it completes without any error. But, when I attempt 25 gb of data at a time I get below errors. I'm trying to understand how can I avoid below failures. Happy to hear any suggestions or ideas.

Differnt errors,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0  org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx  org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094} 

Cluster Details:

Yarn: 8 Nodes
Total cores: 64
Memory: 500 GB
Spark Version: 1.5

Spark submit statement:

spark-submit --master yarn-cluster \                         --conf spark.dynamicAllocation.enabled=true \                         --conf spark.shuffle.service.enabled=true \                         --executor-memory 4g \                         --driver-memory 16g \                         --num-executors 50 \                         --deploy-mode cluster \                         --executor-cores 1 \                         --class my.parser \                         myparser.jar \                         -input xxx \                         -output xxxx \ 

One of stack trace:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456) at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
like image 899
WoodChopper Avatar asked Jan 22 '16 07:01

WoodChopper


People also ask

What is fetch failed exception in spark?

A Fetch Failed Exception, reported in a shuffle reduce task, indicates the failure in reading of one or more shuffle blocks from the hosting executors. Debugging a FetchFailed Exception is quite challenging since it can occur due to multiple reasons.

What is spark executor memory overhead?

Memory overhead is the amount of off-heap memory allocated to each executor. By default, memory overhead is set to either 10% of executor memory or 384, whichever is higher.

What is spark SQL shuffle partitions?

The Spark SQL shuffle is a mechanism for redistributing or re-partitioning data so that the data is grouped differently across partitions, based on your data size you may need to reduce or increase the number of partitions of RDD/DataFrame using spark. sql. shuffle. partitions configuration or through code.


1 Answers

This error is almost guaranteed to be caused by memory issues on your executors. I can think of a couple of ways to address these types of problems.

1) You could try to run with more partitions (do a repartition on your dataframe). Memory issues typically arise when one or more partitions contain more data than will fit in memory.

2) I'm noticing that you have not explicitly set spark.yarn.executor.memoryOverhead, so it will default to max(386, 0.10* executorMemory) which in your case will be 400MB. That sounds low to me. I would try to increase it to say 1GB (note that if you increase memoryOverhead to 1GB, you need to lower --executor-memory to 3GB)

3) Look in the log files on the failing nodes. You want to look for the text "Killing container". If you see the text "running beyond physical memory limits", increasing memoryOverhead will - in my experience - solve the problem.

like image 105
Glennie Helles Sindholt Avatar answered Oct 24 '22 21:10

Glennie Helles Sindholt