Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating indices for each group in Spark dataframe

I have a dataframe in Spark with 2 columns, group_id and value, where value is a double. I would like to group the data based on the group_id, order each group by value, and then add a third column index that represents the position of value in the ordering of values for the group.

For example, considering the following input data:

+--------+-----+
|group_id|value|
+--------+-----+
|1       |1.3  |
|2       |0.8  |
|1       |3.4  |
|1       |-1.7 |
|2       |2.3  |
|2       |5.9  |
|1       |2.7  |
|1       |0.0  |
+--------+-----+

The output would then be something like

+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1       |-1.7 |1    |
|1       |0.0  |2    |
|1       |1.3  |3    |
|1       |2.7  |4    |
|1       |3.4  |5    |
|2       |0.8  |1    |
|2       |2.3  |2    |
|2       |5.9  |3    |
+--------+-----+-----+

It is unimportant if the index is 0-based and whether the sort is ascending or descending.

As a follow-up, consider the case where there is a third column, extra, in the original data that takes on multiple values for some (group_id, value) combinations. An example is:

+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1       |1.3  |1    |
|1       |1.3  |2    |
|2       |0.8  |1    |
|1       |3.4  |1    |
|1       |3.4  |2    |
|1       |3.4  |3    |
|1       |-1.7 |1    |
|2       |2.3  |1    |
|2       |5.9  |1    |
|1       |2.7  |1    |
|1       |0.0  |1    |
+--------+-----+-----+

Is there a way to add an index column such that the extra column is not considered but still kept? The output in this case would be

+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1       |-1.7 |1    |1    |
|1       |0.0  |1    |2    |
|1       |1.3  |1    |3    |
|1       |1.3  |2    |3    |
|1       |2.7  |1    |4    |
|1       |3.4  |1    |5    |
|1       |3.4  |2    |5    |
|1       |3.4  |3    |5    |
|2       |0.8  |1    |1    |
|2       |2.3  |1    |2    |
|2       |5.9  |1    |3    |
+--------+-----+-----+-----+

I know that it is possible to do this by duplicating the data, dropping the extra column

  1. Duplicating the data
  2. Dropping the extra column
  3. Performing a distinct operation, which would result in data in the original example
  4. Compute the index column using the original solution
  5. Join the result with the data from the second example

However, this would involve a lot of extra computation and overhead.

like image 242
Jon Claus Avatar asked Mar 03 '17 20:03

Jon Claus


1 Answers

You can use Window functions to create a rank column based on value, partitioned by group_id:

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|       1| -1.7|    1|
|       1|  0.0|    2|
|       1|  1.3|    3|
|       1|  2.7|    4|
|       1|  3.4|    5|
|       2|  0.8|    1|
|       2|  2.3|    2|
|       2|  5.9|    3|
+--------+-----+-----+

Because, you first select '*', you keep all other variables using the above code as well. However, your second example shows that you are looking for the function dense_rank(), which gives as a rank column with no gaps:

df.select('*', dense_rank().over(window).alias('index'))
like image 178
mtoto Avatar answered Dec 12 '22 22:12

mtoto