I have a dataset that is around 190GB that was partitioned into 1000 partitions.
my EMR cluster allows a maximum of 10 r5a.2xlarge TASK nodes and 2 CORE nodes. Each node having 64GB mem and 128GB EBS storage.
In my spark job execution, I have set it to use executor-cores 5, driver cores 5,executor-memory 40g, driver-memory 50g, spark.yarn.executor.memoryOverhead=10g, spark.sql.shuffle.partitions=500, spark.dynamicAllocation.enabled=true
But my job keeps failing with errors like
spark.shuffle.MetadataFetchFailedException
spark.shuffle.FetchFailedException
java.io.IOException: No space left on device
Container Lost
etc...
A lot of the answers to these kinds of issues that I found online say to increase the memoryOverhead. Which i did, from 2G to 10G. My total executor memory and memoryOverhead is 50G. with 40G allocated to executor and 10G allocated to overhead. But I think I am reaching the limit since I won't be able to go above 56.
I thought i did all that was possible to optmize my spark job:
But my job still fails. Is there anything else I can try? Should i increase my overhead even more so that my executor memory/overhead memory is 50/50? The memory profile of my job from ganglia looks something like this:
(The steep drop is when the cluster flushed all the executor nodes due to them being dead)

Any insight would be greatly appreciated
Thank You
EDIT:[SOLUTION]
I am appending to my post with the exact solution that solved my problem thanks to Debuggerrr based on his suggestions in his answer.
persist() method
(suggested by Debuggerrr), I was able to save that to MEMORY and DISC
and simply call it back without parts of it being cleaned up by the
GC.spark.dynamicAllocation.enabled. The blog states that it is best to set the property to false if we are calculating the resources manually since spark tends to misallocate resources if your calculation doesnt line up with it. Once I set it to false, and set the correct executor and spark attributes, it worked like a charm![EDIT 2]: The parameters that specifically worked for my job are:
--executor-cores 5 --driver-cores 5 --executor-memory 44g --driver-memory 44g --num-executors 9 --conf spark.default.parallelism=100 --conf spark.sql.shuffle.partitions=300 --conf spark.yarn.executor.memoryOverhead=11g --conf spark.shuffle.io.retryWait=180s --conf spark.network.timeout=800s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.dynamicAllocation.enabled=false
You can try either of the below steps:
Memory overhead should be 10% of the Executor memory or 328 MB. Don't increase it to any value.number of executors. You have to calculate it in such a way that you leave some space for YARN and background processes. Also, you can try increasing 1 or 2 more cores.cluster mode and whatever number you assign to executors, add +1 to it since 1 executor will be treated as driver executor in the cluster mode.spark-shell on EMR and you will come to know which part of the code is taking much time to run.You can also refer to this official blog for some of the tips.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With