I am new to Spark. How to know which piece of code will run on the driver & which will run on the executors ?
Do we always have to try to code such that everything runs on the executors ?. Is there any recommendations/ways to make most of your code to run on executors ?
Update: I far as I understand Transformations run on executors & actions runs on driver because it needs to return value. So is it fine if the action runs on driver or should it also run on executor ? Where does the driver actually run ? on cluster ?
The Driver process will run on the Master node of your cluster and the Executor processes run on the Worker nodes.
The central coordinator is called Spark Driver and it communicates with all the Workers. Each Worker node consists of one or more Executor(s) who are responsible for running the Task. Executors register themselves with Driver. The Driver has all the information about the Executors at all the time.
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29. Number of executors per node = 30/10 = 3. Memory per executor = 64GB/3 = 21GB.
In cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
Any Spark application consists of a single Driver process and one or more Executor processes. The Driver process will run on the Master node of your cluster and the Executor processes run on the Worker nodes. You can increase or decrease the number of Executor processes dynamically depending upon your usage but the Driver process will exist throughout the lifetime of your application.
The Driver process is responsible for a lot of things including directing the overall control flow of your application, restarting failed stages and the entire high level direction of how your application will process the data.
Coding your application so that more data is processed by Executors falls more under the purview of optimising your application so that it processes data more efficiently/faster making use of all the resources available to it in the cluster. In practice, you do not really need to worry about making sure that more of your data is being processed by executors.
That being said, there are some Actions, which when triggered, necessarily involve shuffling around of data. If you call the collect
action on an RDD, all the data is brought to the Driver process and if your RDD had a sufficiently large amount of data in it, an Out Of Memory
error will be triggered by the application, as the single machine running the Driver process will not be able to hold all the data.
Keeping the above in mind, Transformations are lazy and Actions are not. Transformations basically transform one RDD into another. But calling a transformation on an RDD does not actually result in any data being processed anywhere, Driver or Executor. All a transformation does is that it adds to the DAG's lineage graph which will be executed when an Action is called.
So the actual processing happens when you call an Action on an RDD. The simplest example is that of calling collect
. As soon as an action is called, Spark gets to work and executes the previously saved DAG computations on the specified RDD, returning the result back. Where these computations are executed depends entirely on your application.
There is no simple and straightforward answer here.
As a rule of thumb everything that is executed inside closures of higher order functions like mapPartitions
(map
, filter
, flatMap
) or combineByKey
should be handled mostly by executor machines. Everything outside these are handled by the driver. But you have to be aware that it is a serious simplification.
Depending on a specific method and language at least a part of the job can be handled by the driver. For example when you use combine
-like methods (reduce
, aggregate
) final merging is applied locally on the driver machine. Complex algorithms (like many can ML / MLlib tools) can interleave distributed and local processing when needed.
Moreover data processing is only a fraction of a whole job. Driver is responsible for bookeeping, accumulator processing, initial broadcasting and other secondary tasks. It also handles lineage and DAG processing and generating execution plans for higher level APIs (Dataset
, SparkSQL
).
While the whole picture is relatively complex in practice your choices are relatively limited. You can:
collect
, toLocalIterator
) to process locally.tree*
(treeAggregate
, treeReduce
) methods.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With