I have the following code which fires hiveContext.sql()
most of the time. My task is I want to create few tables and insert values into after processing for all hive table partition.
So I first fire show partitions
and using its output in a for-loop, I call a few methods which creates the table (if it doesn't exist) and inserts into them using hiveContext.sql
.
Now, we can't execute hiveContext
in an executor, so I have to execute this in a for-loop in a driver program, and should run serially one by one. When I submit this Spark job in YARN cluster, almost all the time my executor gets lost because of shuffle not found exception.
Now this is happening because YARN is killing my executor because of memory overload. I don't understand why, as I have a very small data set for each hive partition, but still it causes YARN to kill my executor.
Will the following code do everything in parallel and try to accommodate all hive partition data in memory at the same time?
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")");
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String server = splitArr[0].split("=")[1];
String date = splitArr[1].split("=")[1];
String csvPath = "hdfs:///user/db/ext/"+server+".csv";
if(fs.exists(new Path(csvPath))) {
hiveContext.sql("ADD FILE " + csvPath);
}
createInsertIntoTableABC(hc,entity, date);
createInsertIntoTableDEF(hc,entity, date);
createInsertIntoTableGHI(hc,entity,date);
createInsertIntoTableJKL(hc,entity, date);
createInsertIntoTableMNO(hc,entity,date);
}
}
You can resolve it by setting the partition size: increase the value of spark. sql. shuffle. partitions.
Now you have three limits. Your executor JVM cannot use more than 8 GB of memory. Your non JVM processes cannot use more than 800 MB. Your container has a maximum physical limit of 8.8 GB.
According to the recommendations which we discussed above: Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.
The most likely cause of this exception is that not enough heap memory is allocated to the Java virtual machines (JVMs). These JVMs are launched as executors or drivers as part of the Apache Spark application.
Generally, you should always dig into logs to get the real exception out (at least in Spark 1.3.1).
tl;dr
safe config for Spark under Yarnspark.shuffle.memoryFraction=0.5
- this would allow shuffle use more of allocated memoryspark.yarn.executor.memoryOverhead=1024
- this is set in MB. Yarn kills executors when its memory usage is larger then (executor-memory + executor.memoryOverhead)
Little more info
From reading your question you mention that you get shuffle not found exception.
In case of
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle
you should increase spark.shuffle.memoryFraction
, for example to 0.5
Most common reason for Yarn killing off my executors was memory usage beyond what it expected.
To avoid that you increase spark.yarn.executor.memoryOverhead
, I've set it to 1024, even if my executors use only 2-3G of memory.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With