Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Calculating percentage of total count for groupBy using pyspark

I have the following code in pyspark, resulting in a table showing me the different values for a column and their counts. I want to have another column showing what percentage of the total count does each row represent. How do I do that?

difrgns = (df1

Thanks in advance!

like image 966
Giordan Pretelin Avatar asked Sep 11 '18 20:09

Giordan Pretelin

2 Answers

An example as an alternative if not comfortable with Windowing as the comment alludes to and is the better way to go:

# Running in Databricks, not all stuff required
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
#from pyspark.sql.functions import col

data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
        ("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)

someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))

df = sqlContext.createDataFrame(someschema)

tot = df.count()

df.groupBy("c1") \
  .count() \
  .withColumnRenamed('count', 'cnt_per_group') \
  .withColumn('perc_of_count_total', (F.col('cnt_per_group') / tot) * 100 ) \


| c1|cnt_per_group|perc_of_count_total|
|  E|            1| 16.666666666666664|
|  B|            1| 16.666666666666664|
|  D|            1| 16.666666666666664|
|  C|            1| 16.666666666666664|
|  A|            2|  33.33333333333333|

I focus on Scala and it seems easier with that. That said, the suggested solution via the comments uses Window which is what I would do in Scala with over().

like image 196
thebluephantom Avatar answered Oct 20 '22 14:10


When df itself is a more complex transformation chain and running it twice -- first to compute the total count and then to group and compute percentages -- is too expensive, it's possible to leverage a window function to achieve similar results. Here's a more generalized code (extending bluephantom's answer) that could be used with a number of group-by dimensions:

from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
        ("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)

someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))

df = (sqlContext.createDataFrame(someschema)
      .withColumn('total_count', count('*').over(Window.partitionBy(<your N-1 dimensions here>)))
     .groupBy(<your N dimensions here>)

like image 42
Dmitry B. Avatar answered Oct 20 '22 14:10

Dmitry B.