I have a dataframe in Spark with 2 columns, group_id
and value
, where value
is a double. I would like to group the data based on the group_id
, order each group by value
, and then add a third column index
that represents the position of value
in the ordering of values for the group.
For example, considering the following input data:
+--------+-----+
|group_id|value|
+--------+-----+
|1 |1.3 |
|2 |0.8 |
|1 |3.4 |
|1 |-1.7 |
|2 |2.3 |
|2 |5.9 |
|1 |2.7 |
|1 |0.0 |
+--------+-----+
The output would then be something like
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
|1 |-1.7 |1 |
|1 |0.0 |2 |
|1 |1.3 |3 |
|1 |2.7 |4 |
|1 |3.4 |5 |
|2 |0.8 |1 |
|2 |2.3 |2 |
|2 |5.9 |3 |
+--------+-----+-----+
It is unimportant if the index is 0-based and whether the sort is ascending or descending.
As a follow-up, consider the case where there is a third column, extra
, in the original data that takes on multiple values for some (group_id, value)
combinations. An example is:
+--------+-----+-----+
|group_id|value|extra|
+--------+-----+-----+
|1 |1.3 |1 |
|1 |1.3 |2 |
|2 |0.8 |1 |
|1 |3.4 |1 |
|1 |3.4 |2 |
|1 |3.4 |3 |
|1 |-1.7 |1 |
|2 |2.3 |1 |
|2 |5.9 |1 |
|1 |2.7 |1 |
|1 |0.0 |1 |
+--------+-----+-----+
Is there a way to add an index
column such that the extra
column is not considered but still kept? The output in this case would be
+--------+-----+-----+-----+
|group_id|value|extra|index|
+--------+-----+-----+-----+
|1 |-1.7 |1 |1 |
|1 |0.0 |1 |2 |
|1 |1.3 |1 |3 |
|1 |1.3 |2 |3 |
|1 |2.7 |1 |4 |
|1 |3.4 |1 |5 |
|1 |3.4 |2 |5 |
|1 |3.4 |3 |5 |
|2 |0.8 |1 |1 |
|2 |2.3 |1 |2 |
|2 |5.9 |1 |3 |
+--------+-----+-----+-----+
I know that it is possible to do this by duplicating the data, dropping the extra
column
extra
columndistinct
operation, which would result in data in the original exampleindex
column using the original solutionHowever, this would involve a lot of extra computation and overhead.
You can use Window
functions to create a rank column based on value
, partitioned by group_id
:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank
# Define window
window = Window.partitionBy(df['group_id']).orderBy(df['value'])
# Create column
df.select('*', rank().over(window).alias('index')).show()
+--------+-----+-----+
|group_id|value|index|
+--------+-----+-----+
| 1| -1.7| 1|
| 1| 0.0| 2|
| 1| 1.3| 3|
| 1| 2.7| 4|
| 1| 3.4| 5|
| 2| 0.8| 1|
| 2| 2.3| 2|
| 2| 5.9| 3|
+--------+-----+-----+
Because, you first select '*'
, you keep all other variables using the above code as well. However, your second example shows that you are looking for the function dense_rank()
, which gives as a rank column with no gaps:
df.select('*', dense_rank().over(window).alias('index'))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With