What's the meaning of the title "Locality Level" and the 5 status Data local --> process local --> node local --> rack local --> Any?
Data locality in spark helps spark scheduler to run the tasks of compute or caching on the machines where the data is available. This concept came from Hadoop Map/Reduce where data in HDFS will be used to place map operation. This avoided the data movement over network in HDFS.
locality. wait ‚ is set to 3 seconds. Spark will wait to launch a task on an executor local to the data using this value. After this period if the data-local node is still‚ unavailable, Spark‚ will give up and launch the task on another less-local node.
NODE_LOCAL - data and processing are in the same node but on different executor. This level is slower than the previous one because it has to move the data between processed. RACK_LOCAL - data is located in other node than processing but both nodes are on the same rack.
The locality level as far as I know indicates which type of access to data has been performed. When a node finishes all its work and its CPU become idle, Spark may decide to start other pending task that require obtaining data from other places. So ideally, all your tasks should be process local as it is associated with lower data access latency.
You can configure the wait time before moving to other locality levels using:
spark.locality.wait
More information about the parameters can be found in the Spark Configuration docs
With respect to the different levels PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, or ANY I think the methods findTask and findSpeculativeTask in org.apache.spark.scheduler.TaskSetManager illustrate how Spark chooses tasks based on their locality level. It first will check for PROCESS_LOCAL tasks which are going to be launched in the same executor process. If not, it will check for NODE_LOCAL tasks that may be in other executors in the same node or it need to be retrieved from systems like HDFS, cached, etc. RACK_LOCAL means that data is in another node and therefore it need to be transferred prior execution. And finally, ANY is just to take any pending task that may run in the current node.
/** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. */ private def findTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL)) } } if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- findTaskFromList(execId, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL)) } } // Look for no-pref tasks after rack-local tasks since they can run anywhere. for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- findTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY)) } } // Finally, if all else has failed, find a speculative task findSpeculativeTask(execId, host, locality) }
Here are my two cents and I summarized mostly from spark official guide.
Firstly, I want to add one more locality level which is NO_PREF
which has been discussed at this thread.
Then, let's put those levels together into a single table,
It's noted that specific level can be skipped as per guide from spark configuration.
For instance, if you want to skip NODE_LOCAL
, just set spark.locality.wait.node
to 0.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With