Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Group By, Rank and aggregate spark data frame using pyspark

Tags:

I have a dataframe that looks like:

A     B    C --------------- A1    B1   0.8 A1    B2   0.55 A1    B3   0.43  A2    B1   0.7 A2    B2   0.5 A2    B3   0.5  A3    B1   0.2 A3    B2   0.3 A3    B3   0.4 

How do I convert the column 'C' to the relative rank(higher score->better rank) per column A? Expected Output:

A     B    Rank --------------- A1    B1   1 A1    B2   2 A1    B3   3  A2    B1   1 A2    B2   2 A2    B3   2  A3    B1   3 A3    B2   2 A3    B3   1 

The ultimate state I want to reach is to aggregate column B and store the ranks for each A:

Example:

B    Ranks B1   [1,1,3] B2   [2,2,2] B3   [3,2,1] 
like image 547
futurenext110 Avatar asked Jan 15 '17 12:01

futurenext110


People also ask

How do you use groupBy and AGG in PySpark?

Method 1: Using groupBy() Method In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. Here the aggregate function is sum(). sum(): This will return the total values for each group.

How does PySpark sort grouped data?

We will sort the table using the sort() function in which we will access the column using the col() function and desc() function to sort it in descending order.

Can we use groupBy without aggregate function in PySpark?

At best you can use . first , . last to get respective values from the groupBy but not all in the way you can get in pandas. Since their is a basic difference between the way the data is handled in pandas and spark not all functionalities can be used in the same way.


2 Answers

Add rank:

from pyspark.sql.functions import * from pyspark.sql.window import Window  ranked =  df.withColumn(   "rank", dense_rank().over(Window.partitionBy("A").orderBy(desc("C")))) 

Group by:

grouped = ranked.groupBy("B").agg(collect_list(struct("A", "rank")).alias("tmp")) 

Sort and select:

grouped.select("B", sort_array("tmp")["rank"].alias("ranks")) 

Tested with Spark 2.1.0.

like image 183
user7337271 Avatar answered Sep 21 '22 19:09

user7337271


windowSpec = Window.partitionBy("col1").orderBy("col2") ranked = demand.withColumn("col_rank", row_number().over(windowSpec)) ranked.show(1000) 
like image 26
Laxman Jeergal Avatar answered Sep 20 '22 19:09

Laxman Jeergal