Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do I get so many empty partitions when repartionning a Spark Dataframe?

I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

I wrote a simple way to count rows in each partition:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.

I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...

What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?

I'm using Spark 2.2.0 on Cloudera.

like image 359
Tomcat Avatar asked Jun 05 '18 07:06

Tomcat


People also ask

How do I reduce the number of partitions in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

How many partitions we get when we create Spark DataFrame?

Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions. By default, DataFrame shuffle operations create 200 partitions.

Can coalesce increase partitions in Spark?

You can try to increase the number of partitions with coalesce, but it won't work! numbersDf3 keeps four partitions even though we attemped to create 6 partitions with coalesce(6). The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions.

Which is better coalesce or repartition in Spark?

The repartition() can be used to increase or decrease the number of partitions, but it involves heavy data shuffling across the cluster. On the other hand, coalesce() can be used only to decrease the number of partitions. In most of the cases, coalesce() does not trigger a shuffle.


1 Answers

To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join will apply hash partitioner.

Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions. In your case multiple values are mapping to same partition index.

You could implement your own partitioner if you want better distribution. More about it is here and here and here.

like image 54
Vladislav Varslavans Avatar answered Sep 23 '22 19:09

Vladislav Varslavans