A map operation in my Spark APP takes an RDD[A]
as input and map each element in RDD[A]
using a custom mapping function func(x:A):B
to another object of type B. Because func()
requires significant amount of memory when computing each input x
, I want to limit the number of concurrent map tasks per executor such that the total amount of memory required by all tasks on the same executor does not exceeds the amount of physical memory available on the node.
I checked out available spark configurations, but not sure which one to use. Does using coalesce(numPartitions)
to set the number of partitions for RDD[A]
fulfil the purpose?
Number of tasks execution in parallel Let's say, you have 5 executors available for your application. Each executor is assigned 10 CPU cores. 5 executors and 10 CPU cores per executor = 50 CPU cores available in total. With the above setup, Spark can execute a maximum of 50 tasks in parallel at any given time.
--executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins. The --num-executors command-line flag or spark. executor.
Tiny Approach – Allocating one executor per core. This will not leave enough memory overhead for YARN and accumulates cached variables (broadcast and accumulator), causing no benefit running multiple tasks in the same JVM. Fat Approach – Allocating one executor per Node.
The number of concurrent tasks per executor is related to the available number of cores, not the number of tasks, so changing the parallelism level using coalesce
or repartition
will not help in constraining the used memory for each task, only the amount of data on each partition that needs to be processed by a given task (*).
As far as I know, there's no way to constrain the memory used by a single task, because it's sharing the resources of the worker JVM, and hence sharing memory with the other tasks on the same executor.
Assuming a fair share per task, a guideline for the amount of memory available per task (core) will be:
spark.executor.memory * spark.storage.memoryFraction / #cores-per-executor
Probably, a way to force less tasks per executor, and hence more memory available per task, would be to assign more cores per task, using spark.task.cpus
(default = 1)
(*) Given that the concern here is at the level of each element x
of an RDD, the only possible setting that could affect memory usage is to set a parallelism level less than the number of CPUs of a single executor, but that would result in severe under-utilization of the cluster resources as all workers but one will be idle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With