Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

I am using Spark SQL actually hiveContext.sql() which uses group by queries and I am running into OOM issues. So thinking of increasing value of spark.sql.shuffle.partitions from 200 default to 1000 but it is not helping.

I believe this partition will share data shuffle load so more the partitions less data to hold. I am new to Spark. I am using Spark 1.4.0 and I have around 1TB of uncompressed data to process using hiveContext.sql() group by queries.

like image 779
Umesh K Avatar asked Sep 02 '15 09:09

Umesh K


People also ask

How do you optimize shuffling in spark?

The next strategy is reduce the amount of data being shuffled as a whole. few of the things that we can do are: get rid of the columns that you don't need, filter out unnecessary records, optimize data ingestion. De-normalize the datasets specifically if the shuffle is caused by a join.

What is a good number of partitions in spark?

The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.

How do I increase the number of partitions in spark?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.


1 Answers

If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001.

Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:

private[spark] object MapStatus {    def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {     if (uncompressedSizes.length > 2000) {       HighlyCompressedMapStatus(loc, uncompressedSizes)     } else {       new CompressedMapStatus(loc, uncompressedSizes)     }   } ... 

I really wish they would let you configure this independently.

By the way, I found this information in a Cloudera slide deck.

like image 52
nont Avatar answered Oct 12 '22 02:10

nont