Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD default number of partitions

Version: Spark 1.6.2, Scala 2.10

I am executing below commands In the spark-shell. I am trying to see the number of partitions that Spark is creating by default.

val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4

//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2

As per the Apache Spark documentation, the spark.default.parallelism is the number of cores in my laptop (which is 2 core processor).

My question is : rdd2 seem to be giving the correct result of 2 partitions as said in the documentation. But why rdd1 is giving the result as 4 partitions ?

like image 529
Sri Avatar asked May 27 '17 22:05

Sri


People also ask

How do I know how many partitions I have in RDD?

Similarly, in PySpark you can get the current length/size of partitions by running getNumPartitions() of RDD class, so to use with DataFrame first you need to convert to RDD.

How many partitions should I have Spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.

What is partition in Spark RDD?

What is Partition? As per spark documentation, A partition in spark is an atomic chunk of data (a logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDDs/Dataframe/Dataset in Apache Spark is a collection of partitions.

Which method prints the number of partitions of the RDD?

You can easily see how many partitions a given RDD has by using data. getNumPartitions .


1 Answers

The minimum number of partitions is actually a lower bound set by the SparkContext. Since spark uses hadoop under the hood, Hadoop InputFormat` will still be the behaviour by default.

The first case should reflect defaultParallelism as mentioned here which may differ, depending on settings and hardware. (Numbers of cores, etc.)

So unless you provide the number of slices, that first case would be defined by the number described by sc.defaultParallelism:

scala> sc.defaultParallelism
res0: Int = 6

scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6

As for the second case, with sc.textFile, the number of slices by default is the minimum number of partitions.

Which is equal to 2 as you can see in this section of code.

Thus, you should consider the following :

  • sc.parallelize will take numSlices or defaultParallelism.

  • sc.textFile will take the maximum between minPartitions and the number of splits computed based on hadoop input split size divided by the block size.

    • sc.textFile calls sc.hadoopFile, which creates a HadoopRDD that uses InputFormat.getSplits under the hood [Ref. InputFormat documentation].

    • InputSplit[] getSplits(JobConf job, int numSplits) throws IOException : Logically split the set of input files for the job. Each InputSplit is then assigned to an individual Mapper for processing. Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be tuple. Parameters: job - job configuration. numSplits - the desired number of splits, a hint. Returns: an array of InputSplits for the job. Throws: IOException.

Example:

Let's create some dummy text files:

fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt

This will create 2 files, respectively, of size 241MB and 4GB.

We can see what happens when we read each of the files:

scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> rdd.getNumPartitions
// res0: Int = 8

scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27

scala> rdd2.getNumPartitions
// res1: Int = 128

Both of them are actually HadoopRDDs:

scala> rdd.toDebugString
// res2: String = 
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
//  |  bigfile.txt HadoopRDD[0] at textFile at <console>:27 []

scala> rdd2.toDebugString
// res3: String = 
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
//   |   hugefile.txt HadoopRDD[2] at textFile at <console>:27 []
like image 136
eliasah Avatar answered Oct 07 '22 10:10

eliasah