The "old" SparkContext.hadoopFile
takes a minPartitions
argument, which is a hint for the number of partitions:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
But there is no such argument on SparkContext.newAPIHadoopFile
:
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
In fact mapred.InputFormat.getSplits
takes a hint argument, but mapreduce.InputFormat.getSplits
takes a JobContext
. What is the way to influence the number of splits through the new API?
I have tried setting mapreduce.input.fileinputformat.split.maxsize
and fs.s3n.block.size
on the Configuration
object, but they had no effect. I am trying to load a 4.5 GB file from s3n
, and it gets loaded in a single task.
https://issues.apache.org/jira/browse/HADOOP-5861 is relevant, but it suggests that I should already see more than one split, since the default block size is 64 MB.
By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
The function newApiHadoopFile
allows you to pass a configuration object so in that you can set mapred.max.split.size
.
Even though this is in the mapred
namespace since there is seemingly no new option I would imagine the new API will respect the variable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With