I have the following code in pyspark, resulting in a table showing me the different values for a column and their counts. I want to have another column showing what percentage of the total count does each row represent. How do I do that?
difrgns = (df1
.groupBy("column_name")
.count()
.sort(desc("count"))
.show())
Thanks in advance!
An example as an alternative if not comfortable with Windowing as the comment alludes to and is the better way to go:
# Running in Databricks, not all stuff required
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
#from pyspark.sql.functions import col
data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))
df = sqlContext.createDataFrame(someschema)
tot = df.count()
df.groupBy("c1") \
.count() \
.withColumnRenamed('count', 'cnt_per_group') \
.withColumn('perc_of_count_total', (F.col('cnt_per_group') / tot) * 100 ) \
.show()
returns:
+---+-------------+-------------------+
| c1|cnt_per_group|perc_of_count_total|
+---+-------------+-------------------+
| E| 1| 16.666666666666664|
| B| 1| 16.666666666666664|
| D| 1| 16.666666666666664|
| C| 1| 16.666666666666664|
| A| 2| 33.33333333333333|
+---+-------------+-------------------+
I focus on Scala and it seems easier with that. That said, the suggested solution via the comments uses Window which is what I would do in Scala with over().
When df
itself is a more complex transformation chain and running it twice -- first to compute the total count and then to group and compute percentages -- is too expensive, it's possible to leverage a window function to achieve similar results. Here's a more generalized code (extending bluephantom's answer) that could be used with a number of group-by dimensions:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))
df = (sqlContext.createDataFrame(someschema)
.withColumn('total_count', count('*').over(Window.partitionBy(<your N-1 dimensions here>)))
.groupBy(<your N dimensions here>)
.agg((count('*')/first(col('total_count'))).alias('percent_total'))
)
df.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With