When decreasing the number of partitions one can use coalesce
, which is great because it doesn't cause a shuffle and seems to work instantly (doesn't require an additional job stage).
I would like to do the opposite sometimes, but repartition
induces a shuffle. I think a few months ago I actually got this working by using CoalescedRDD
with balanceSlack = 1.0
- so what would happen is it would split a partition so that the resulting partitions location where all on the same node (so small net IO).
This kind of functionality is automatic in Hadoop, one just tweaks the split size. It doesn't seem to work this way in Spark unless one is decreasing the number of partitions. I think the solution might be to write a custom partitioner along with a custom RDD where we define getPreferredLocations
... but I thought that is such a simple and common thing to do surely there must be a straight forward way of doing it?
Things tried:
.set("spark.default.parallelism", partitions)
on my SparkConf
, and when in the context of reading parquet I've tried sqlContext.sql("set spark.sql.shuffle.partitions= ...
, which on 1.0.0 causes an error AND not really want I want, I want partition number to change across all types of job, not just shuffles.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.
Coalesce doesn't involve a full shuffle. If the number of partitions is reduced from 5 to 2. Coalesce will not move data in 2 executors and move the data from the remaining 3 executors to the 2 executors. Thereby avoiding a full shuffle.
Spark RDD Shuffle Though reduceByKey() triggers data shuffle, it doesn't change the partition count as RDD's inherit the partition size from parent RDD. You may get partition counts different based on your setup and how Spark creates partitions.
Spark Default Shuffle Partition DataFrame increases the partition number to 200 automatically when Spark operation performs data shuffling (join (), union (), aggregation functions). This default shuffle partition number comes from Spark SQL configuration spark.sql.shuffle.partitions which is by default set to 200.
Spark data frames are the partitions of Shuffle operations. The original data frame partitions differ with the number of data frame partitions. The data moving from one partition to the other partition process in order to mat up, aggregate, join, or spread out in other ways is called a shuffle.
Important points to be noted about Shuffle in Spark 1 Spark Shuffle partitions have a static number of shuffle partitions. 2 Shuffle Spark partitions do not change with the size of data. More ...
Spark: increase number of partitions without causing a shuffle? Bookmark this question. Show activity on this post. When decreasing the number of partitions one can use coalesce, which is great because it doesn't cause a shuffle and seems to work instantly (doesn't require an additional job stage).
Watch this space
https://issues.apache.org/jira/browse/SPARK-5997
This kind of really simple obvious feature will eventually be implemented - I guess just after they finish all the unnecessary features in Dataset
s.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With