E.g. I need to get a list of all available executors and their respective multithreading capacity (NOT the total multithreading capacity, sc.defaultParallelism already handle that).
Since this parameter is implementation-dependent (YARN and spark-standalone have different strategy for allocating cores) and situational (it may fluctuate because of dynamic allocation and long-term job running). I cannot use other method to estimate this. Is there a way to retrieve this information using Spark API in a distributed transformation? (E.g. TaskContext, SparkEnv)
UPDATE As for Spark 1.6, I have tried the following methods:
1) run a 1-stage job with many partitions ( >> defaultParallelism ) and count the number of distinctive threadIDs for each executorID:
val n = sc.defaultParallelism * 16
sc.parallelize(n, n).map(v => SparkEnv.get.executorID -> Thread.currentThread().getID)
.groupByKey()
.mapValue(_.distinct)
.collect()
This however leads to an estimation higher than actual multithreading capacity because each Spark executor uses an overprovisioned thread pool.
2) Similar to 1, except that n = defaultParallesim, and in every task I add a delay to prevent resource negotiator from imbalanced sharding (a fast node complete it's task and asks for more before slow nodes can start running):
val n = sc.defaultParallelism
sc.parallelize(n, n).map{
v =>
Thread.sleep(5000)
SparkEnv.get.executorID -> Thread.currentThread().getID
}
.groupByKey()
.mapValue(_.distinct)
.collect()
it works most of the time, but is much slower than necessary and may be broken by very imbalanced cluster or task speculation.
3) I haven't try this: use java reflection to read BlockManager.numUsableCores, this is obviously not a stable solution, the internal implementation may change at any time.
Please tell me if you have found something better.
According to the recommendations which we discussed above: Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15. So, Total available of cores in cluster = 15 x 10 = 150. Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30.
The consensus in most Spark tuning guides is that 5 cores per executor is the optimum number of cores in terms of parallel processing.
The - -num-executors YARN flag controls the number of executors requested. One executor is created on each node allocated with Slurm when using Spark in the standalone mode (so that 5 executors would be created in the above example).
It is pretty easy with Spark rest API. You have to get application id:
val applicationId = spark.sparkContext.applicationId
ui URL:
val baseUrl = spark.sparkContext.uiWebUrl
and query:
val url = baseUrl.map { url =>
s"${url}/api/v1/applications/${applicationId}/executors"
}
With Apache HTTP library (already in Spark dependencies, adapted from https://alvinalexander.com/scala/scala-rest-client-apache-httpclient-restful-clients):
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.client.methods.HttpGet
import scala.util.Try
val client = new DefaultHttpClient()
val response = url
.flatMap(url => Try{client.execute(new HttpGet(url))}.toOption)
.flatMap(response => Try{
val s = response.getEntity().getContent()
val json = scala.io.Source.fromInputStream(s).getLines.mkString
s.close
json
}.toOption)
and json4s:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
case class ExecutorInfo(hostPort: String, totalCores: Int)
val executors: Option[List[ExecutorInfo]] = response.flatMap(json => Try {
parse(json).extract[List[ExecutorInfo]]
}.toOption)
As long as you keep application id and ui URL at hand and open ui port to external connections you can do the same thing from any task.
I would try to implement SparkListener
in a way similar to web UI does. This code might be helpful as an example.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With