I try to repartition a DataFrame according to a column the the DataFrame has N
(let say N=3
) different values in the partition-column x
, e.g:
val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data
What I like to achieve is to repartiton myDF
by x
without producing empty partitions. Is there a better way than doing this?
val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")
(If I don't specify numParts
in repartiton
, most of my partitions are empty (as repartition
creates 200 partitions) ...)
There isn't an easy way to simply delete the empty partitions from a RDD. coalesce doesn't guarantee that the empty partitions will be deleted. If you have a RDD with 40 blank partitions and 10 partitions with data, there will still be empty partitions after rdd. coalesce(45) .
Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.
Coalesce will not move data in 2 executors and move the data from the remaining 3 executors to the 2 executors. Thereby avoiding a full shuffle. Because of the above reason the partition size vary by a high degree. Since full shuffle is avoided, coalesce is more performant than repartition.
If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.
I'd think of solution with iterating over df
partition and fetching record count in it to find non-empty partitions.
val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart")
df.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
As we got non-empty partitions (nonEmptyPart
), we can clean empty partitions by using coalesce()
(check coalesce() vs repartition()).
val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type
It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()
val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")
df1.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
val finalDf = df1.coalesce(nonEmptyPart.value.toInt)
println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")
Output
nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With