Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dropping empty DataFrame partitions in Apache Spark

I try to repartition a DataFrame according to a column the the DataFrame has N (let say N=3) different values in the partition-column x, e.g:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

What I like to achieve is to repartiton myDF by x without producing empty partitions. Is there a better way than doing this?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(If I don't specify numParts in repartiton, most of my partitions are empty (as repartition creates 200 partitions) ...)

like image 451
Raphael Roth Avatar asked Jan 25 '17 15:01

Raphael Roth


People also ask

How do I remove a blank partition in Spark?

There isn't an easy way to simply delete the empty partitions from a RDD. coalesce doesn't guarantee that the empty partitions will be deleted. If you have a RDD with 40 blank partitions and 10 partitions with data, there will still be empty partitions after rdd. coalesce(45) .

How do I reduce the number of partitions in Spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

Why coalesce is better than repartition in Spark?

Coalesce will not move data in 2 executors and move the data from the remaining 3 executors to the 2 executors. Thereby avoiding a full shuffle. Because of the above reason the partition size vary by a high degree. Since full shuffle is avoided, coalesce is more performant than repartition.

How do I change the number of partitions in a Spark data frame?

If you want to increase the partitions of your DataFrame, all you need to run is the repartition() function. Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.


1 Answers

I'd think of solution with iterating over df partition and fetching record count in it to find non-empty partitions.

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 

df.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

As we got non-empty partitions (nonEmptyPart), we can clean empty partitions by using coalesce() (check coalesce() vs repartition()).

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

It may or may not be the best, but this solution will avoid shuffling as we are not using repartition()


Example to address comment

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")

df1.foreachPartition(partition =>
  if (partition.length > 0) nonEmptyPart.add(1))

val finalDf = df1.coalesce(nonEmptyPart.value.toInt)

println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

Output

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3
like image 143
mrsrinivas Avatar answered Oct 18 '22 22:10

mrsrinivas