Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark - How does internal job scheduler in spark define what are users and what are pools

I am sorry about being a little general here, but I am a little confused about how job scheduling works internally in spark. From the documentation here I get that it is some sort of implementation of Hadoop Fair Scheduler.

I am unable to come around to understand that who exactly are users here (are the linux users, hadoop users, spark clients?). I am also unable to understand how are the pools defined here. For example, In my hadoop cluster I have given resource allocation to two different pools (lets call them team 1 and team 2). But in spark cluster, wont different pools and the users in them instantiate their own spark context? Which again brings me to question that what parameters do I pass when I am setting property to spark.scheduler.pool.

I have a basic understanding of how driver instantiates a spark context and then splits them into task and jobs. May be I am missing the point completely here but I would really like to understand how Spark's internal scheduler works in context of actions, tasks and job

like image 219
Arpit1286 Avatar asked Apr 23 '15 18:04

Arpit1286


People also ask

What happens internally when we submit a Spark job?

The entire resource allocation and the tracking of the jobs and tasks are performed by the cluster manager. As soon as you do a Spark submit, your user program and other configuration mentioned are copied onto all the available nodes in the cluster. So that the program becomes the local read on all the worker nodes.

How are Spark jobs scheduled?

By default, Spark's scheduler runs jobs in FIFO fashion. Each job is divided into “stages” (e.g. map and reduce phases), and the first job gets priority on all available resources while its stages have tasks to launch, then the second job gets priority, etc.

How Spark is working internally what is the role of driver and executor?

The central coordinator is called Spark Driver and it communicates with all the Workers. Each Worker node consists of one or more Executor(s) who are responsible for running the Task. Executors register themselves with Driver. The Driver has all the information about the Executors at all the time.

How do Spark jobs work?

When you enter your code in spark, SparkContext in the driver program creates the job when we call an Action. This job submits to DAG Scheduler which creates the operator graph and then submits it to task Scheduler. Task Scheduler launches the task via cluster manager.


1 Answers

I find official documentation quite thorough and covering all your questions. However, one might find it hard to digest from the first time.

Let us put some definitions and rough analogues before we delve into details. application is what creates SparkContext sc and may be referred to as something you deploy with spark-submit. job is an action in spark definition of transformation and action meaning anything like count, collect etc.

There are two main and in some sense separate topics: Scheduling Across applications and Scheduling Within application. The former relates more to Resource Managers including Spark Standalone FIFO only mode and also concept of static and dynamic allocation.

The later, Scheduling Within Spark application is the matter of your question, as I understood from your comment. Let me try to describe what happens there at some level of abstraction.

Suppose, you submitted your application and you have two jobs

sc.textFile("..").count()   //job1
sc.textFile("..").collect() //job2

If this code happens to be executed in the same thread there is no much interesting happening here, job2 and all its tasks get resources only after job1 is done.

Now say you have the following

thread1 { job1 }
thread2 { job2 } 

This is getting interesting. By default, within your application scheduler will use FIFO to allocate resources to all the tasks of whichever job happens to appear to scheduler as first. Tasks for the other job will get resources only when there are spare cores and no more pending tasks from more "prioritized" first job.

Now suppose you set spark.scheduler.mode=FAIR for your application. From now on each job has a notion of pool it belongs to. If you do nothing then for every job pool label is "default". To set the label for your job you can do the following

sc.setLocalProperty("spark.scheduler.pool", "pool1").textFile("").count() // job1
sc.setLocalProperty("spark.scheduler.pool", "pool2").textFile("").collect() // job2

One important note here is that setLocalProperty is effective per thread and also all spawned threads. What it means for us? Well if you are within the same thread it means nothing as jobs are executed one after another. However, once you have the following

thread1 { job1 } // pool1
thread2 { job2 } // pool2

job1 and job2 become unrelated in the sense of resource allocation. In general, properly configuring each pool in fairscheduler file with minShare > 0 you can be sure that jobs from different pools will have resources to proceed.

However, you can go even further. By default, within each pool jobs are queued up in a FIFO manner and this situation is basically the same as in the scenario when we have had FIFO mode and jobs from different threads. To change that you you need to change the pool in the xml file to have <schedulingMode>FAIR</schedulingMode>.

Given all that, if you just set spark.scheduler.mode=FAIR and let all the jobs fall into the same "default" pool, this is roughly the same as if you would use default spark.scheduler.mode=FIFO and have your jobs be launched in different threads. If you still just want single "default" fair pool just change config for "default" pool in xml file to reflect that.

To leverage the mechanism of pools you need to define the concept of user which is the same as setting "spark.scheduler.pool" from a proper thread to a proper value. For example, if your application listens to JMS, then a message processor may set the pool label for each message processing job depending on its content.

Eventually, not sure if the number of words is less than in the official doc, but hopefully it helps is some way :)

like image 175
kasur Avatar answered Sep 21 '22 02:09

kasur