scala> val p=sc.textFile("file:///c:/_home/so-posts.xml", 8) //i've 8 cores
p: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:21
scala> p.partitions.size
res33: Int = 729
I was expecting 8 to be printed and I see 729 tasks in Spark UI
EDIT:
After calling repartition()
as suggested by @zero323
scala> p1 = p.repartition(8)
scala> p1.partitions.size
res60: Int = 8
scala> p1.count
I still see 729 tasks in the Spark UI even though the spark-shell prints 8.
Hash Partitioning in Spark Hash Partitioning attempts to spread the data evenly across various partitions based on the key. Object. hashCode method is used to determine the partition in Spark as partition = key. hashCode () % numPartitions.
Spark will run one task for each partition of the cluster.
The coalesce() and repartition() transformations are both used for changing the number of partitions in the RDD. The main difference is that: If we are increasing the number of partitions use repartition() , this will perform a full shuffle.
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
If you take a look at the signature
textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
you'll see that the argument you use is called minPartitions
and this pretty much describes its function. In some cases even that is ignored but it is a different matter. Input format which is used behind the scenes still decides how to compute splits.
In this particular case you could probably use mapred.min.split.size
to increase split size (this will work during load) or simply repartition
after loading (this will take effect after data is loaded) but in general there should be no need for that.
@zero323 nailed it, but I thought I'd add a bit more (low-level) background on how this minPartitions
input parameter influences the number of partitions.
tl;dr The partition parameter does have an effect on SparkContext.textFile
as the minimum (not the exact!) number of partitions.
In this particular case of using SparkContext.textFile, the number of partitions are calculated directly by org.apache.hadoop.mapred.TextInputFormat.getSplits(jobConf, minPartitions) that is used by textFile
. TextInputFormat
only knows how to partition (aka split) the distributed data with Spark only following the advice.
From Hadoop's FileInputFormat's javadoc:
FileInputFormat is the base class for all file-based InputFormats. This provides a generic implementation of getSplits(JobConf, int). Subclasses of FileInputFormat can also override the isSplitable(FileSystem, Path) method to ensure input-files are not split-up and are processed as a whole by Mappers.
It is a very good example how Spark leverages Hadoop API.
BTW, You may find the sources enlightening ;-)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With