Here is my dataframe:
The underlying RDD has 2 partitions
When I do a df.count, the DAG produced is
When I do a df.rdd.count, the DAG produced is:
Ques: Count is an action in spark, the official definition is ‘Returns the number of rows in the DataFrame.’. Now, when I perform the count on the dataframe why is a shuffle occurring? Besides, when I do the same on the underlying RDD no shuffle occurs.
It makes no sense to me why a shuffle would occur anyway. I tried to go through the source code of count here spark github But it doesn’t make sense to me fully. Is the “groupby” being supplied to the action the culprit?
PS. df.coalesce(1).count does not cause any shuffle
It seems that DataFrame's count operation uses groupBy resulting in shuffle. Below is the code from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long = withAction("count", groupBy().count().queryExecution) {
plan =>
plan.executeCollect().head.getLong(0)
}
While if you look at RDD's count function, it passes on the aggregate function to each of the partitions, which returns the sum of each partition as Array and then use .sum to sum elements of array.
Code snippet from this link: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
When spark is doing dataframe operation, it does first compute partial counts for every partition and then having another stage to sum those up together. This is particularly good for large dataframes, where distributing counts to multiple executors actually adds to performance.
The place to verify this is SQL tab of Spark UI, which would have some sort of the following physical plan description:
*HashAggregate(keys=[], functions=[count(1)], output=[count#202L])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#206L])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With