Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why so many tasks in my spark job? Getting 200 Tasks By Default

Tags:

I have a spark job that takes a file with 8 records from hdfs, does a simple aggregation and saves it back to hdfs. I notice there are like hundreds of tasks when I do this.

I also am not sure why there are multiple jobs for this? I thought a job was more like when an action happened. I can speculate as to why - but my understanding was that inside of this code it should be one job and it should be broken down into stages, not multiple jobs. Why doesn't it just break it down into stages, how come it breaks into jobs?

As far as the 200 plus tasks, since the amount of data and the amount of nodes is miniscule, it doesn't make sense that there is like 25 tasks for each row of data when there is only one aggregations and a couple of filters. Why wouldn't it just have one task per partition per atomic operation?

Here is the relevant scala code -

import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf  object TestProj {object TestProj {   def main(args: Array[String]) {      /* set the application name in the SparkConf object */     val appConf = new SparkConf().setAppName("Test Proj")      /* env settings that I don't need to set in REPL*/     val sc = new SparkContext(appConf)     val sqlContext = new SQLContext(sc)     import sqlContext.implicits._      val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")       /*the below rdd will have schema defined in Record class*/      val rddCase =  sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")       .map(x=>x.split(" "))    //file record into array of strings based spaces       .map(x=>Record(         x(0).toInt,         x(1).asInstanceOf[String],         x(2).asInstanceOf[String],         x(3).toInt))       /* the below dataframe groups on first letter of first name and counts it*/     val aggDF = rddCase.toDF()       .groupBy($"firstName".substr(1,1).alias("firstLetter"))       .count       .orderBy($"firstLetter")      /* save to hdfs*/   aggDF.write.format("parquet").mode("append").save("/raw/miscellaneous/ex_out_agg")    }      case class Record(id: Int       , firstName: String       , lastName: String       , quantity:Int)  } 

Below is the screen shot after clicking on the application enter image description here

Below is are the stages show when viewing the specific "job" of id 0 enter image description here

Below is the first part of the screen when clicking on the stage with over 200 tasks

enter image description here

This is the second part of the screen inside the stageenter image description here

Below is after clicking on the "executors" tab enter image description here

As requested, here are the stages for Job ID 1

enter image description here

Here are the details for the stage in job ID 1 with 200 tasks

enter image description here

like image 577
uh_big_mike_boi Avatar asked Jun 11 '16 00:06

uh_big_mike_boi


People also ask

How does Spark decide number of tasks?

Number of tasks execution in parallel Number of CPU cores available for an executor determines the number of tasks that can be executed in parallel for an application for any given time.

How many tasks does Spark run on each partition?

Spark assigns one task per partition and each worker can process one task at a time.

How many tasks does Spark run on each partition during a Spark job submit in a cluster?

We describe operations on distributed datasets later on. One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster.

How many jobs are created in Spark?

In case of spark streaming, we have one job per action.


1 Answers

This is a classic Spark question.

The two tasks used for reading (Stage Id 0 in second figure) is the defaultMinPartitions setting which is set to 2. You can get this parameter by reading the value in the REPL sc.defaultMinPartitions. It should also be visible in the Spark UI under the "Environment" tab.

You can take a look at the code from GitHub to see that this exactly what is happening. If you want more partitions to be used on read, just add it as a parameter e.g., sc.textFile("a.txt", 20).

Now the interesting part comes from the 200 partitions that come on the second stage (Stage Id 1 in second figure). Well, each time there is a shuffle, Spark needs to decide how many partitions will the shuffle RDD have. As you can imagine, the default is 200.

You can change that using:

sqlContext.setConf("spark.sql.shuffle.partitions", "4”) 

If you run your code with this configuration you will see that the 200 partitions are not going to be there any more. How to set this parameter is kind of an art. Maybe choose 2x the number of cores you have (or whatever).

I think Spark 2.0 has a way to automatically infer the best number of partitions for shuffle RDDs. Looking forward to that!

Finally, the number of jobs you get has to do with how many RDD actions the resulting optimized Dataframe code resulted to. If you read the Spark specs it says that each RDD action will trigger one job. When you action involves a Dataframe or SparkSQL the Catalyst optimizer will figure out an execution plan and generate some RDD based code to execute it. It's hard to say exactly why it uses two actions in your case. You may need to look at the optimized query plan to see exactly what is doing.

like image 143
marios Avatar answered Oct 03 '22 22:10

marios