I implement Apache Spark Scheduling Within like that (Scala code) :
// group into list of 10 items...
val maxSimultaneousSubmitAndMonitorThreadsInDriver = 10
// ... in order to throttle the number of threads submitting and monitoring apps at a time
val lists = myList grouped maxSimultaneousThreadsInDriver
for (aList <- lists) {
// pick a list, then convert it to Scala Parallel list
aList.par.foreach { // so 10 threads MAX at a time, that can handle job submission and monitoring
case (file_name) => {
// in each driver thread, create different Spark session
val sparkChild = sparkMain.newSession()
// then do specific stuff with such session
val childDF = sparkChild.read.parquet( filename + "_directory/*.parquet")
...
}
}
}
So as you know with the concept of Scheduling Within such single driver instance can monitor multiples Spark applications. so I can have multiples Spark applications that run simultaneously. (In my case Each Spark app, can then do very specific tasks per file read, based on the name, depending on business rules).
The scheduler is configured in FIFO mode per default :
By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc. If the jobs at the head of the queue don’t need to use the whole cluster, later jobs can start to run right away [...]
Such solution works for me. However I found Spark Scheduling Within a bit slow. For instance, when I see the Spark UI Executors tab, I can see that most of the time, only few cores are used.
It's the opposite of classical Spark apps I have that naturally fully consume CPU almost all of the time !
So my final question, is how to optimize performance of Spark Scheduling Within ?
What I tried :
maxSimultaneousSubmitAndMonitorThreadsInDriver
, in order to throttle the number of threads that are submitting and monitoring an app at a given timespark.scheduler.listenerbus.eventqueue.capacity
spark.default.parallelism
spark.sql.shuffle.partitions
If I increase the number of threads that can submit and monitor Spark apps simultaneously (with a throttle system), I end up with OOM.
Regarding spark.default.parallelism
and spark.sql.shuffle.partitions
, I don't know how to choose a relevant value. If I do NOT Scheduling Within (with only one application per driver) the value I set would probably be 192 (the number of cores) to have good results.
But with Scheduling Within it's unclear. Each submitted job is small, and parallelism 192 for each job seems overkill (and slow ?)..
Any input would be greatly appreciated
First, you define maxSimultaneousSubmitAndMonitorThreadsInDriver=10
and then use maxSimultaneousThreadsInDriver
instead of the one you just declared, is this in purpose?
Second, try to remove the line val sparkChild = sparkMain.newSession()
and change the next line to val childDF = sparkMain.read.parquet( filename + "_directory/*.parquet")
instead, does it compile? if it does leave it that way and check again.
Did you try to increase executors count?
Add or change if the parameter already exists in your spark-submit to --num-executors 20
, if creating context by code add conf.set("spark.executor.instances", 20)
right before the new SparkContext(conf)
line in your code .
Now run again, does it increase performance? if it is but not enough increase to 40.
If you still stuck, keep reading.
The default Spark job running behavior is FIFO, i.e the first job will be prioritized and the later will be executed only if there are available resources after the first job released resources.
I guess that you are getting only 14 tasks (7 in each executor) because your files are very small, if the tasks runs pretty fast so repartition will not solve the problem, but allowing parallel jobs will.
Since you are looking for parallelism between your jobs, I would suggest you to use the FAIR scheduler and assign different pool to each thread/job you create.
Configure FAIR share for your spark application, by adding to your spark-submit --conf spark.scheduler.mode=FAIR
, if creating context by code add conf.set("spark.scheduler.mode", FAIR)
right before the new SparkContext(conf)
line in your code .
Assign random pool name before any job execution inside your thread (you could take the thread id but even for the same thread different pool names are recommended for each job):
val randomString = scala.util.Random.alphanumeric.take(10).mkString("")
sparkMaster.setLocalProperty("spark.scheduler.pool", randomString)
val childDF = sparkMaster.read.parquet( filename + "_directory/*.parquet")
Now the FAIR share should kick in and splitting the resources equally between the threads.
If you still see low cores usage, try to maximize your max thread pool capacity to the maximum you can without targeting OOM.
if its still slow, consider repartition to (max_cores / max_threads)
, in your case (saw 2 executors with 192 cores available, i.e total 384 384/10=38
, so repartition(38) may help.
Reference: https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With