In many posts there is the statement - as shown below in some form or another - due to some question on shuffling, partitioning, due to JOIN, AGGR, whatever, etc.:
... In general whenever you do a spark sql aggregation or join which shuffles data this is the number of resulting partitions = 200. This is set by spark.sql.shuffle.partitions. ...
So, my question is:
I ask as I never see a clear viewpoint.
I did the following test:
// genned a DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count
sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.
ds1.rdd.partitions.size
ds2.rdd.partitions.size
val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer")
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size
On the 1st test - not defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 200. Even though SO post 45704156 states it may not apply to DFs - this is a DS.
On the 2nd test - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 765. Even though SO post 45704156 states it may not apply to DFs - this is a DS.
sql. shuffle. partitions which is by default set to 200 . You can change this default shuffle partition value using conf method of the SparkSession object or using Spark Submit Command Configurations.
The Spark shuffle is a mechanism for redistributing or re-partitioning data so that the data grouped differently across partitions. Spark shuffle is a very expensive operation as it moves the data between executors or even between worker nodes in a cluster.
Usually, the number of partitions should be 1x to 4x of the number of cores you have to gain optimized performance (which means create a cluster that matches your data scale is also important).
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
It is a combination of both your guesses.
Assume you have a set of input data with M partitions and you set shuffle partitions to N.
When executing a join, spark reads your input data in all M partitions and re-shuffle the data based on the key to N partitions. Imagine a trivial hashpartitioner, the hash function applied on the key pretty much looks like A = hashcode(key) % N, and then this data is re-allocated to the node in charge of handling the Ath partition. Each node can be in charge of handling multiple partitions.
After shuffling, the nodes will work to aggregate the data in partitions they are in charge of. As no additional shuffling needs to be done here, the nodes can produce the output directly.
So in summary, your output will be coalesced to N partitions, however it is coalesced because it is processed in N partitions, not because spark applies one additional shuffle stage to specifically repartition your output data to N.
Spark.sql.shuffle.partitions is the parameter which decides the number of partitions while doing shuffles like joins or aggregation i.e where data movement is there across the nodes. The other part spark.default.parallelism will be calculated on basis of your data size and max block size, in HDFS it’s 128mb. So if your job does not do any shuffle it will consider the default parallelism value or if you are using rdd you can set it by your own. While shuffling happens it will take 200.
Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // this will use 4 partitions
Val df1 = df df1.except(df).count // will generate 200 partitions having 2 stages
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With