Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the meaning of "Locality Level"on Spark cluster

What's the meaning of the title "Locality Level" and the 5 status Data local --> process local --> node local --> rack local --> Any?

enter image description here

like image 519
fanhk Avatar asked Nov 18 '14 12:11

fanhk


People also ask

How does data locality work in Spark?

Data locality in spark helps spark scheduler to run the tasks of compute or caching on the machines where the data is available. This concept came from Hadoop Map/Reduce where data in HDFS will be used to place map operation. This avoided the data movement over network in HDFS.

What is Spark locality wait?

locality. wait ‚ is set to 3 seconds. Spark will wait to launch a task on an executor local to the data using this value. After this period if the data-local node is still‚ unavailable, Spark‚ will give up and launch the task on another less-local node.

What is node local and rack local?

NODE_LOCAL - data and processing are in the same node but on different executor. This level is slower than the previous one because it has to move the data between processed. RACK_LOCAL - data is located in other node than processing but both nodes are on the same rack.


2 Answers

The locality level as far as I know indicates which type of access to data has been performed. When a node finishes all its work and its CPU become idle, Spark may decide to start other pending task that require obtaining data from other places. So ideally, all your tasks should be process local as it is associated with lower data access latency.

You can configure the wait time before moving to other locality levels using:

spark.locality.wait 

More information about the parameters can be found in the Spark Configuration docs

With respect to the different levels PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, or ANY I think the methods findTask and findSpeculativeTask in org.apache.spark.scheduler.TaskSetManager illustrate how Spark chooses tasks based on their locality level. It first will check for PROCESS_LOCAL tasks which are going to be launched in the same executor process. If not, it will check for NODE_LOCAL tasks that may be in other executors in the same node or it need to be retrieved from systems like HDFS, cached, etc. RACK_LOCAL means that data is in another node and therefore it need to be transferred prior execution. And finally, ANY is just to take any pending task that may run in the current node.

  /**    * Dequeue a pending task for a given node and return its index and locality level.    * Only search for tasks matching the given locality constraint.    */   private def findTask(execId: String, host: String, locality: TaskLocality.Value)     : Option[(Int, TaskLocality.Value)] =   {     for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {       return Some((index, TaskLocality.PROCESS_LOCAL))     }      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {       for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {         return Some((index, TaskLocality.NODE_LOCAL))       }     }      if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {       for {         rack <- sched.getRackForHost(host)         index <- findTaskFromList(execId, getPendingTasksForRack(rack))       } {         return Some((index, TaskLocality.RACK_LOCAL))       }     }      // Look for no-pref tasks after rack-local tasks since they can run anywhere.     for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {       return Some((index, TaskLocality.PROCESS_LOCAL))     }      if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {       for (index <- findTaskFromList(execId, allPendingTasks)) {         return Some((index, TaskLocality.ANY))       }     }      // Finally, if all else has failed, find a speculative task     findSpeculativeTask(execId, host, locality)   } 
like image 186
Daniel H. Avatar answered Oct 03 '22 01:10

Daniel H.


Here are my two cents and I summarized mostly from spark official guide.

Firstly, I want to add one more locality level which is NO_PREF which has been discussed at this thread.
Then, let's put those levels together into a single table,

enter image description here

It's noted that specific level can be skipped as per guide from spark configuration.

For instance, if you want to skip NODE_LOCAL, just set spark.locality.wait.node to 0.

like image 32
Eugene Avatar answered Oct 03 '22 02:10

Eugene