Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What factors decide the number of executors in a stand alone mode?

Given a Spark application

  1. What factors decide the number of executors in a stand alone mode? In the Mesos and YARN according to this documents, we can specify the number of executers/cores and memory.

  2. Once a number of executors are started. Does Spark start the tasks in a round robin fashion or is it smart enough to see if some of the executors are idle/busy and then schedule the tasks accordingly.

  3. Also, how does Spark decide on the number of tasks? I did write a simple max temperature program with small dataset and Spark spawned two tasks in a single executor. This is in the Spark stand alone mode.

like image 346
Praveen Sripati Avatar asked Sep 19 '14 16:09

Praveen Sripati


People also ask

How do you determine the number of executors?

Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30.

How would you set the number of executors in any Spark based application?

The number of executors for a spark application can be specified inside the SparkConf or via the flag –num-executors from command-line. Cluster Manager : An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN).

How many executors can an employee have?

In a standalone cluster you will get one executor per worker unless you play with `spark. executor. cores` and a worker has enough cores to hold more than one executor. When i start an application with default settings, Spark will greedily acquire as many cores and executors as are offered by the scheduler.

How many executors can you have in Spark?

The first Spark job starts with two executors (because the minimum number of nodes is set to two in this example). The cluster can autoscale to a maximum of ten executors (because the maximum number of nodes is set to ten).


3 Answers

Answering your questions:

  1. The standalone mode uses the same configuration variable as Mesos and Yarn modes to set the number of executors. The variable spark.cores.max defines the maximun number of cores used in the spark Context. The default value is infinity so Spark will use all the cores in the cluster. The spark.task.cpus variable defines how many CPUs Spark will allocate for a single task, the default value is 1. With these two variables you can define the maximun number of parallel tasks in your cluster.

  2. When you create an RDD subClass you can define in which machines to run your task. This is defined in the getPreferredLocations method. But as the method signatures suggest this is only a preference so if Spark detects that one machine is not busy, it will launch the task in this idle machine. However I don't know the mechanism used by Spark to know what machines are idle. To achieve locality, we (Stratio) decided to make each Partions smaller so the task takes less time and achieve locality.

  3. The number of tasks of each Spark's operation is defined according to the length of the RDD's partitions. This vector is the result of the getPartitions method that you have to override if you want to develop a new RDD subClass. This method returns how a RDD is split, where the information is and the partitions. When you join two or more RDDs using, for example, union or join operations, the number of tasks of the resulting RDD is the maximum number of tasks of the RDDs involved in the operation. For example: if you join RDD1 that has 100 tasks and RDD2 that has 1000 tasks, the next operation of the resulting RDD will have 1000 tasks. Note that a high number of partitions is not necessarily synonym of more data.

I hope this will help.

like image 198
jlopezmat Avatar answered Oct 17 '22 21:10

jlopezmat


I agree with @jlopezmat about how Spark chooses its configuration. With respect to your test code, your are seeing two task due to the way textFile is implemented. From SparkContext.scala:

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString)
  }

and if we check what is the value of defaultMinPartitions:

  /** Default min number of partitions for Hadoop RDDs when not given by user */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
like image 28
Daniel H. Avatar answered Oct 17 '22 21:10

Daniel H.


Spark chooses the number of tasks based on the number of partitions in the original data set. If you are using HDFS as your data source, then the number of partitions with be equal to the number of HDFS blocks, by default. You can change the number of partitions in a number of different ways. The top two: as an extra argument to the SparkContext.textFile method; by calling the RDD.repartion method.

like image 44
David Avatar answered Oct 17 '22 20:10

David