I run pyspark
on 8 node Google dataproc cluster with default settings.
Few seconds after starting I see 30 executor cores running (as expected):
>>> sc.defaultParallelism 30
One minute later:
>>> sc.defaultParallelism 2
From that point all actions run on only 2 cores:
>>> rng = sc.parallelize(range(1,1000000)) >>> rng.cache() >>> rng.count() >>> rng.getNumPartitions() 2
If I run rng.cache()
while cores are still connected they stay connected and jobs get distributed.
Checking on monitoring app (port 4040 on master node) shows executors are removed:
Executor 1
Removed at 2016/02/25 16:20:14
Reason: Container container_1456414665542_0006_01_000002 exited from explicit termination request."
Is there some setting that could keep cores connected without workarounds?
Now you have three limits. Your executor JVM cannot use more than 8 GB of memory. Your non JVM processes cannot use more than 800 MB. Your container has a maximum physical limit of 8.8 GB.
Five executors with 3 cores or three executors with 5 cores The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.
According to the recommendations which we discussed above: Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.
FileAlreadyExistsException in Spark jobs As a result, the FileAlreadyExistsException error occurs. When any Spark executor fails, Spark retries to start the task, which might result into FileAlreadyExistsException error after the maximum number of retries.
For the most part, what you are seeing is actually just the differences in how Spark on YARN can be configured vs spark standalone. At the moment, YARN's reporting of "VCores Used" doesn't actually correctly correspond to a real container reservation of cores, and containers are actually just based on the memory reservation.
Overall there are a few things at play here:
Dynamic allocation causes Spark to relinquish idle executors back to YARN, and unfortunately at the moment spark prints that spammy but harmless "lost executor" message. This was the classical problem of spark on YARN where spark originally paralyzed clusters it ran on because it would grab the maximum number of containers it thought it needed and then never give them up.
With dynamic allocation, when you start a long job, spark quickly allocates new containers (with something like exponential ramp-up to quickly be able to fill a full YARN cluster within a couple minutes), and when idle, relinquishes executors with the same ramp-down at an interval of about 60 seconds (if idle for 60 seconds, relinquish some executors).
If you want to disable dynamic allocation you can run:
spark-shell --conf spark.dynamicAllocation.enabled=false
gcloud dataproc jobs submit spark --properties spark.dynamicAllocation.enabled=false --cluster <your-cluster> foo.jar
Alternatively, if you specify a fixed number of executors, it should also automatically disable dynamic allocation:
spark-shell --conf spark.executor.instances=123
gcloud dataproc jobs submit spark --properties spark.executor.instances=123 --cluster <your-cluster> foo.jar
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With