I am using Spark SQL actually hiveContext.sql()
which uses group by queries and I am running into OOM issues. So thinking of increasing value of spark.sql.shuffle.partitions
from 200 default to 1000 but it is not helping.
I believe this partition will share data shuffle load so more the partitions less data to hold. I am new to Spark. I am using Spark 1.4.0 and I have around 1TB of uncompressed data to process using hiveContext.sql()
group by queries.
The next strategy is reduce the amount of data being shuffled as a whole. few of the things that we can do are: get rid of the columns that you don't need, filter out unnecessary records, optimize data ingestion. De-normalize the datasets specifically if the shuffle is caused by a join.
The general recommendation for Spark is to have 4x of partitions to the number of cores in cluster available for application, and for upper bound — the task should take 100ms+ time to execute.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
If you're running out of memory on the shuffle, try setting spark.sql.shuffle.partitions
to 2001.
Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000:
private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new CompressedMapStatus(loc, uncompressedSizes) } } ...
I really wish they would let you configure this independently.
By the way, I found this information in a Cloudera slide deck.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With