Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark looses all executors one minute after starting

I run pyspark on 8 node Google dataproc cluster with default settings. Few seconds after starting I see 30 executor cores running (as expected):

    >>> sc.defaultParallelism
    30

One minute later:

    >>> sc.defaultParallelism
    2

From that point all actions run on only 2 cores:


    >>> rng = sc.parallelize(range(1,1000000))
    >>> rng.cache()
    >>> rng.count()
    >>> rng.getNumPartitions()
    2

If I run rng.cache() while cores are still connected they stay connected and jobs get distributed.

Checking on monitoring app (port 4040 on master node) shows executors are removed:

Executor 1
Removed at 2016/02/25 16:20:14
Reason: Container container_1456414665542_0006_01_000002 exited from explicit termination request." 

Is there some setting that could keep cores connected without workarounds?

like image 405
Tomas Vitulskis Avatar asked Feb 26 '16 10:02

Tomas Vitulskis


People also ask

What is the maximum executor memory in Spark?

Now you have three limits. Your executor JVM cannot use more than 8 GB of memory. Your non JVM processes cannot use more than 800 MB. Your container has a maximum physical limit of 8.8 GB.

How many executors should I use Spark?

Five executors with 3 cores or three executors with 5 cores The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.

How do I allocate executors memory in Spark?

According to the recommendations which we discussed above: Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.

What happens if a Spark executor fails?

FileAlreadyExistsException in Spark jobs As a result, the FileAlreadyExistsException error occurs. When any Spark executor fails, Spark retries to start the task, which might result into FileAlreadyExistsException error after the maximum number of retries.


1 Answers

For the most part, what you are seeing is actually just the differences in how Spark on YARN can be configured vs spark standalone. At the moment, YARN's reporting of "VCores Used" doesn't actually correctly correspond to a real container reservation of cores, and containers are actually just based on the memory reservation.

Overall there are a few things at play here:

Dynamic allocation causes Spark to relinquish idle executors back to YARN, and unfortunately at the moment spark prints that spammy but harmless "lost executor" message. This was the classical problem of spark on YARN where spark originally paralyzed clusters it ran on because it would grab the maximum number of containers it thought it needed and then never give them up.

With dynamic allocation, when you start a long job, spark quickly allocates new containers (with something like exponential ramp-up to quickly be able to fill a full YARN cluster within a couple minutes), and when idle, relinquishes executors with the same ramp-down at an interval of about 60 seconds (if idle for 60 seconds, relinquish some executors).

If you want to disable dynamic allocation you can run:

spark-shell --conf spark.dynamicAllocation.enabled=false

gcloud dataproc jobs submit spark --properties spark.dynamicAllocation.enabled=false --cluster <your-cluster> foo.jar

Alternatively, if you specify a fixed number of executors, it should also automatically disable dynamic allocation:

spark-shell --conf spark.executor.instances=123

gcloud dataproc jobs submit spark --properties spark.executor.instances=123 --cluster <your-cluster> foo.jar
like image 94
DoiT International Avatar answered Nov 03 '22 01:11

DoiT International