Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark.sql.shuffle.partitions of 200 default partitions conundrum

Tags:

apache-spark

In many posts there is the statement - as shown below in some form or another - due to some question on shuffling, partitioning, due to JOIN, AGGR, whatever, etc.:

... In general whenever you do a spark sql aggregation or join which shuffles data this is the number of resulting partitions = 200. This is set by spark.sql.shuffle.partitions. ...

So, my question is:

  • Do we mean that if we have set partitioning at 765 for a DF, for example,
  • That the processing occurs against 765 partitions, but that the output is coalesced / re-partitioned standardly to 200 - referring here to word resulting?
  • Or does it do the processing using 200 partitions after coalescing / re-partitioning to 200 partitions before JOINing, AGGR?

I ask as I never see a clear viewpoint.

I did the following test:

// genned a DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size

On the 1st test - not defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 200. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

On the 2nd test - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 765. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

like image 812
thebluephantom Avatar asked Aug 21 '18 13:08

thebluephantom


People also ask

What should be the value of Spark SQL shuffle partitions?

sql. shuffle. partitions which is by default set to 200 . You can change this default shuffle partition value using conf method of the SparkSession object or using Spark Submit Command Configurations.

What does Spark SQL shuffle partitions do?

The Spark shuffle is a mechanism for redistributing or re-partitioning data so that the data grouped differently across partitions. Spark shuffle is a very expensive operation as it moves the data between executors or even between worker nodes in a cluster.

How do I choose the number of shuffle partitions?

Usually, the number of partitions should be 1x to 4x of the number of cores you have to gain optimized performance (which means create a cluster that matches your data scale is also important).

How many partitions does Spark create by default?

By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.


2 Answers

It is a combination of both your guesses.

Assume you have a set of input data with M partitions and you set shuffle partitions to N.

When executing a join, spark reads your input data in all M partitions and re-shuffle the data based on the key to N partitions. Imagine a trivial hashpartitioner, the hash function applied on the key pretty much looks like A = hashcode(key) % N, and then this data is re-allocated to the node in charge of handling the Ath partition. Each node can be in charge of handling multiple partitions.

After shuffling, the nodes will work to aggregate the data in partitions they are in charge of. As no additional shuffling needs to be done here, the nodes can produce the output directly.

So in summary, your output will be coalesced to N partitions, however it is coalesced because it is processed in N partitions, not because spark applies one additional shuffle stage to specifically repartition your output data to N.

like image 196
HaoYuan Avatar answered Oct 23 '22 06:10

HaoYuan


Spark.sql.shuffle.partitions is the parameter which decides the number of partitions while doing shuffles like joins or aggregation i.e where data movement is there across the nodes. The other part spark.default.parallelism will be calculated on basis of your data size and max block size, in HDFS it’s 128mb. So if your job does not do any shuffle it will consider the default parallelism value or if you are using rdd you can set it by your own. While shuffling happens it will take 200.

Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // this will use 4 partitions

Val df1 = df df1.except(df).count // will generate 200 partitions having 2 stages

like image 22
Chandan Ray Avatar answered Oct 23 '22 07:10

Chandan Ray