This question is same as Number of partitions of a spark dataframe created by reading the data from Hive table
But I think that question did not get a correct answer. Note that the question is asking how many partitions will be created when the dataframe is created as a result of executing a sql query against a HIVE table using SparkSession.sql method.
IIUC, the question above is distinct from asking how many partitions will be created when the dataframe is created as a result of executing some code like spark.read.json("examples/src/main/resources/people.json")
which loads the data directly from the filesystem - which could be HDFS. I think the answer to this latter question is given by spark.sql.files.maxPartitionBytes
spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.
Experimentally, I have tried creating a dataframe from a HIVE table and the # of partitions I get is not explained by total data in hive table / spark.sql.files.maxPartitionBytes
Also adding to the OP, it would be good to know how can the number of partitions be controlled i.e., when one wants to force spark to use a different number than it would by default.
References:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
In Spark, one should carefully choose the number of partitions depending on the cluster design and application requirements. The best technique to determine the number of spark partitions in an RDD is to multiply the number of cores in the cluster with the number of partitions.
repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame. However, repartition() involves shuffling which is a costly operation.
With the block size of each block as 128MB. Spark will read the data. Say if your hive table size was aprrox 14.8 GB then it will divide the hive table data into 128 MB blocks and will result in 119 Partitions. On the other hand your hive table is partitioned so the partition column has 150 unique values.
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
TL;DR: The default number of partitions when reading data from Hive will be governed by the HDFS blockSize. The number of partitions can be increased by setting mapreduce.job.maps to appropriate value, and can be decreased by setting mapreduce.input.fileinputformat.split.minsize to appropriate value
Spark SQL creates an instance of HadoopRDD when loading data from a hive table.
An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (org.apache.hadoop.mapred).
HadoopRDD in turn splits input files according to the computeSplitSize
method defined in org.apache.hadoop.mapreduce.lib.input.FileInputFormat (the new API) and org.apache.hadoop.mapred.FileInputFormat (the old API).
New API:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
Old API:
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
computeSplitSize splits files according to the HDFS blockSize but if the blockSize is less than minSize
or greater than maxSize
then it is clamped to those extremes. The HDFS blockSize can be obtained from
hdfs getconf -confKey dfs.blocksize
According to Hadoop the definitive guide Table 8.5, the minSize
is obtained from mapreduce.input.fileinputformat.split.minsize
and the maxSize
is obtained from mapreduce.input.fileinputformat.split.maxsize
.
However, the book also mentions regarding mapreduce.input.fileinputformat.split.maxsize
that:
This property is not present in the old MapReduce API (with the exception of CombineFileInputFormat). Instead, it is calculated indirectly as the size of the total input for the job, divided by the guide number of map tasks specified by mapreduce.job.maps (or the setNumMapTasks() method on JobConf).
this post also calculates the maxSize using the total input size divided by the number of map tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With