Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Method to get number of cores for a executor on a task node?

E.g. I need to get a list of all available executors and their respective multithreading capacity (NOT the total multithreading capacity, sc.defaultParallelism already handle that).

Since this parameter is implementation-dependent (YARN and spark-standalone have different strategy for allocating cores) and situational (it may fluctuate because of dynamic allocation and long-term job running). I cannot use other method to estimate this. Is there a way to retrieve this information using Spark API in a distributed transformation? (E.g. TaskContext, SparkEnv)

UPDATE As for Spark 1.6, I have tried the following methods:

1) run a 1-stage job with many partitions ( >> defaultParallelism ) and count the number of distinctive threadIDs for each executorID:

val n = sc.defaultParallelism * 16
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()

This however leads to an estimation higher than actual multithreading capacity because each Spark executor uses an overprovisioned thread pool.

2) Similar to 1, except that n = defaultParallesim, and in every task I add a delay to prevent resource negotiator from imbalanced sharding (a fast node complete it's task and asks for more before slow nodes can start running):

val n = sc.defaultParallelism
sc.parallelize(n, n).map{
  v =>
    Thread.sleep(5000)
    SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()

it works most of the time, but is much slower than necessary and may be broken by very imbalanced cluster or task speculation.

3) I haven't try this: use java reflection to read BlockManager.numUsableCores, this is obviously not a stable solution, the internal implementation may change at any time.

Please tell me if you have found something better.

like image 304
tribbloid Avatar asked Jul 20 '17 04:07

tribbloid


People also ask

How do you determine the number of executors and cores in Spark?

According to the recommendations which we discussed above: Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15. So, Total available of cores in cluster = 15 x 10 = 150. Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30.

How many cores does executor Spark have?

The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.

How many nodes can an executor have?

The - -num-executors YARN flag controls the number of executors requested. One executor is created on each node allocated with Slurm when using Spark in the standalone mode (so that 5 executors would be created in the above example).


2 Answers

It is pretty easy with Spark rest API. You have to get application id:

val applicationId = spark.sparkContext.applicationId

ui URL:

val baseUrl = spark.sparkContext.uiWebUrl

and query:

val url = baseUrl.map { url => 
  s"${url}/api/v1/applications/${applicationId}/executors"
}

With Apache HTTP library (already in Spark dependencies, adapted from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients):

import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try

val client = new DefaultHttpClient()

val response = url
  .flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
  .flatMap(response => Try{
    val s = response.getEntity().getContent()
    val json = scala.io.Source.fromInputStream(s).getLines.mkString
    s.close
    json
  }.toOption)

and json4s:

import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats

case class ExecutorInfo(hostPort: String, totalCores: Int)

val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
  parse(json).extract[List[ExecutorInfo]]
}.toOption)

As long as you keep application id and ui URL at hand and open ui port to external connections you can do the same thing from any task.

like image 149
Alper t. Turker Avatar answered Nov 03 '22 00:11

Alper t. Turker


I would try to implement SparkListener in a way similar to web UI does. This code might be helpful as an example.

like image 20
Vitalii Kotliarenko Avatar answered Nov 03 '22 01:11

Vitalii Kotliarenko