I am trying to sort a value val
using another column ts
for each id
.
# imports
from pyspark.sql import functions as F
from pyspark.sql import SparkSession as ss
import pandas as pd
# create dummy data
pdf = pd.DataFrame( [['2',2,'cat'],['1',1,'dog'],['1',2,'cat'],['2',3,'cat'],['2',4,'dog']] ,columns=['id','ts','val'])
sdf = ss.createDataFrame( pdf )
sdf.show()
+---+---+---+
| id| ts|val|
+---+---+---+
| 2| 2|cat|
| 1| 1|dog|
| 1| 2|cat|
| 2| 3|cat|
| 2| 4|dog|
+---+---+---+
You can aggregate by id
and sort by ts
:
sorted_sdf = ( sdf.groupBy('id')
.agg( F.sort_array( F.collect_list( F.struct( F.col('ts'), F.col('val') ) ), asc = True)
.alias('sorted_col') )
)
sorted_sdf.show()
+---+--------------------+
| id| sorted_col|
+---+--------------------+
| 1| [[1,dog], [2,cat]]|
| 2|[[2,cat], [3,cat]...|
+---+--------------------+
Then, we can explode this list:
explode_sdf = sorted_sdf.select( 'id' , F.explode( F.col('sorted_col') ).alias('sorted_explode') )
explode_sdf.show()
+---+--------------+
| id|sorted_explode|
+---+--------------+
| 1| [1,dog]|
| 1| [2,cat]|
| 2| [2,cat]|
| 2| [3,cat]|
| 2| [4,dog]|
+---+--------------+
Break the tuples of sorted_explode
into two:
detupled_sdf = explode_sdf.select( 'id', 'sorted_explode.*' )
detupled_sdf.show()
+---+---+---+
| id| ts|val|
+---+---+---+
| 1| 1|dog|
| 1| 2|cat|
| 2| 2|cat|
| 2| 3|cat|
| 2| 4|dog|
+---+---+---+
Now our original dataframe is sorted by ts
for each id
!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With