Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

calculating percentages on a pyspark dataframe

I have a pyspark dataframe from the titanic data that I have pasted a copy of below. How would I add a column with the percentages of each bucket?

enter image description here

Thanks for the help!

like image 653
Balla13 Avatar asked May 14 '17 21:05

Balla13


People also ask

How do you find the percentage of a DataFrame in PySpark?

You can calculate the percentage of total with the groupby of pandas DataFrame by using DataFrame. groupby() , DataFrame. agg() , DataFrame. transform() methods and DataFrame.

How do you find the percentage of data?

Finding the percentageDivide the number that you want to turn into a percentage by the whole. In this example, you would divide 2 by 5. 2 divided by 5 = 0.4. You would then multiply 0.4 by 100 to get 40, or 40%.

How do you record percentages?

To determine the percentage, we have to divide the value by the total value and then multiply the resultant by 100.

What is withColumn in PySpark?

In PySpark, the withColumn() function is widely used and defined as the transformation function of the DataFrame which is further used to change the value, convert the datatype of an existing column, create the new column etc.


2 Answers

First a literal DataFrame with your input data:

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()
df = spark.createDataFrame([
    (1,'female',233),
    (None,'female',314),
    (0,'female',81),
    (1, None, 342), 
    (1, 'male', 109),
    (None, None, 891),
    (0, None, 549),
    (None, 'male', 577),
    (0, None, 468)
    ], 
    ['survived', 'sex', 'count'])

Then we use window function to calculate the sum of the count (which is essentially the total count) over a partition that include the complete set of rows:

import pyspark.sql.functions as f
from pyspark.sql.window import Window
df = df.withColumn('percent', f.col('count')/f.sum('count').over(Window.partitionBy()))
df.orderBy('percent', ascending=False).show()

+--------+------+-----+--------------------+
|survived|   sex|count|             percent|
+--------+------+-----+--------------------+
|    null|  null|  891|                0.25|
|    null|  male|  577| 0.16189674523007858|
|       0|  null|  549| 0.15404040404040403|
|       0|  null|  468| 0.13131313131313133|
|       1|  null|  342| 0.09595959595959595|
|    null|female|  314| 0.08810325476992144|
|       1|female|  233|  0.0653759820426487|
|       1|  male|  109| 0.03058361391694725|
|       0|female|   81|0.022727272727272728|
+--------+------+-----+--------------------+

If we divide the above step into two is easier to see that the window function sum is just adding the same total value to every row

df = df\
  .withColumn('total', f.sum('count').over(Window.partitionBy()))\
  .withColumn('percent', f.col('count')/f.col('total'))
df.show()

+--------+------+-----+--------------------+-----+
|survived|   sex|count|             percent|total|
+--------+------+-----+--------------------+-----+
|       1|female|  233|  0.0653759820426487| 3564|
|    null|female|  314| 0.08810325476992144| 3564|
|       0|female|   81|0.022727272727272728| 3564|
|       1|  null|  342| 0.09595959595959595| 3564|
|       1|  male|  109| 0.03058361391694725| 3564|
|    null|  null|  891|                0.25| 3564|
|       0|  null|  549| 0.15404040404040403| 3564|
|    null|  male|  577| 0.16189674523007858| 3564|
|       0|  null|  468| 0.13131313131313133| 3564|
+--------+------+-----+--------------------+-----+
like image 138
RubenLaguna Avatar answered Oct 12 '22 17:10

RubenLaguna


This is probably the option that uses Spark as it's most 'intended' to be used (i.e. it doesn't involve explicitly collecting the data to the driver, and doesn't result in any warnings being generated:

df = spark.createDataFrame([
    (1,'female',233),
    (None,'female',314),
    (0,'female',81),
    (1, None, 342), 
    (1, 'male', 109),
    (None, None, 891),
    (0, None, 549),
    (None, 'male', 577),
    (0, None, 468)
    ], 
    ['survived', 'sex', 'count'])

df.registerTempTable("df")

sql = """
select *, count/(select sum(count) from df) as percentage
from df
"""

spark.sql(sql).show()

Note that for a larger dataset of the type you'd typically be processing in Spark, you wouldn't want to use the solution with a window that spans the whole dataset (like w = Window.partitionBy()). In fact Spark will warn you about this:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

To illustrate the difference, here's the non-window version

sql = """
select *, count/(select sum(count) from df) as percentage
from df
"""

enter image description here

Note that at no point do all 9 rows get shuffled to a single executor.

Here's the version with window:

sql = """
select *, count/sum(count) over () as perc
from df
"""

enter image description here

Note the greater amount of data in the exchange (shuffle) step and where the singlepartition data exchange is happening:

like image 45
RobinL Avatar answered Oct 12 '22 15:10

RobinL