calculating percentages on a pyspark dataframe

I have a pyspark dataframe from the titanic data that I have pasted a copy of below. How would I add a column with the percentages of each bucket?

2 Answers

First a literal DataFrame with your input data:

import findspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()
df = spark.createDataFrame([
    (1, None, 342), 
    (1, 'male', 109),
    (None, None, 891),
    (0, None, 549),
    (None, 'male', 577),
    (0, None, 468)
    ['survived', 'sex', 'count'])

Then we use window function to calculate the sum of the count (which is essentially the total count) over a partition that include the complete set of rows:

import pyspark.sql.functions as f
from pyspark.sql.window import Window
df = df.withColumn('percent', f.col('count')/f.sum('count').over(Window.partitionBy()))
df.orderBy('percent', ascending=False).show()

|survived|   sex|count|             percent|
|    null|  null|  891|                0.25|
|    null|  male|  577| 0.16189674523007858|
|       0|  null|  549| 0.15404040404040403|
|       0|  null|  468| 0.13131313131313133|
|       1|  null|  342| 0.09595959595959595|
|    null|female|  314| 0.08810325476992144|
|       1|female|  233|  0.0653759820426487|
|       1|  male|  109| 0.03058361391694725|
|       0|female|   81|0.022727272727272728|

If we divide the above step into two is easier to see that the window function sum is just adding the same total value to every row

df = df\
  .withColumn('total', f.sum('count').over(Window.partitionBy()))\
  .withColumn('percent', f.col('count')/f.col('total'))

|survived|   sex|count|             percent|total|
|       1|female|  233|  0.0653759820426487| 3564|
|    null|female|  314| 0.08810325476992144| 3564|
|       0|female|   81|0.022727272727272728| 3564|
|       1|  null|  342| 0.09595959595959595| 3564|
|       1|  male|  109| 0.03058361391694725| 3564|
|    null|  null|  891|                0.25| 3564|
|       0|  null|  549| 0.15404040404040403| 3564|
|    null|  male|  577| 0.16189674523007858| 3564|
|       0|  null|  468| 0.13131313131313133| 3564|
This is probably the option that uses Spark as it's most 'intended' to be used (i.e. it doesn't involve explicitly collecting the data to the driver, and doesn't result in any warnings being generated:

df = spark.createDataFrame([
    (1, None, 342), 
    (1, 'male', 109),
    (None, None, 891),
    (0, None, 549),
    (None, 'male', 577),
    (0, None, 468)
    ['survived', 'sex', 'count'])


sql = """
select *, count/(select sum(count) from df) as percentage
from df


Note that for a larger dataset of the type you'd typically be processing in Spark, you wouldn't want to use the solution with a window that spans the whole dataset (like w = Window.partitionBy()). In fact Spark will warn you about this:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

To illustrate the difference, here's the non-window version

sql = """
select *, count/(select sum(count) from df) as percentage
from df

Note that at no point do all 9 rows get shuffled to a single executor.

Here's the version with window:

sql = """
select *, count/sum(count) over () as perc
from df

Note the greater amount of data in the exchange (shuffle) step and where the singlepartition data exchange is happening:

