Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to control partition size in Spark SQL

Tags:

I have a requirement to load data from an Hive table using Spark SQL HiveContext and load into HDFS. By default, the DataFrame from SQL output is having 2 partitions. To get more parallelism i need more partitions out of the SQL. There is no overloaded method in HiveContext to take number of partitions parameter.

Repartitioning of the RDD causes shuffling and results in more processing time.

>

val result = sqlContext.sql("select * from bt_st_ent") 

Has the log output of:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes) Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes) 

I would like to know is there any way to increase the partitions size of the SQL output.

like image 970
nagendra Avatar asked Jul 07 '16 15:07

nagendra


People also ask

How do I reduce the number of partitions in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

How do I change my Spark partition?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.

What is partition size Spark?

The ideal size of each partition is around 100-200 MB. The smaller size of partitions will increase the parallel running jobs, which can improve performance, but too small of a partition will cause overhead and increasing the GC time.

What is default partition size in Spark?

By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.


2 Answers

Spark < 2.0:

You can use Hadoop configuration options:

  • mapred.min.split.size.
  • mapred.max.split.size

as well as HDFS block size to control partition size for filesystem based formats*.

val minSplit: Int = ??? val maxSplit: Int = ???  sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit) sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit) 

Spark 2.0+:

You can use spark.sql.files.maxPartitionBytes configuration:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit) 

In both cases these values may not be in use by a specific data source API so you should always check documentation / implementation details of the format you use.


* Other input formats can use different settings. See for example

  • Partitioning in spark while reading from RDBMS via JDBC
  • Difference between mapreduce split and spark paritition

Furthermore Datasets created from RDDs will inherit partition layout from their parents.

Similarly bucketed tables will use bucket layout defined in the metastore with 1:1 relationship between bucket and Dataset partition.

like image 121
zero323 Avatar answered Oct 16 '22 13:10

zero323


A very common and painful problem. You should look for a key which distributes the data in uniform partitions. The you can use the DISTRIBUTE BY and CLUSTER BY operators to tell spark to group rows in a partition. This will incur some overhead on the query itself. But will result in evenly sized partitions. Deepsense has a very good tutorial on this.

like image 31
Fokko Driesprong Avatar answered Oct 16 '22 11:10

Fokko Driesprong