I have the following DataFrame df:
How can I delete duplicates, while keeping the minimum value of level per each duplicated pair of item_id and country_id.
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 312330| 13535670| 369|
| 312330| 13535670| 376|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 150|
| 311480| 69628930| 138|
| 311480| 69628930| 405|
+-----------+----------+---------------+
The expected output:
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 138|
+-----------+----------+---------------+
I know how to delete duplicates without conditions using dropDuplicates, but I don't know how to do it for my particular case.
One of the method is to use orderBy (default is ascending order), groupBy and aggregation first
import org.apache.spark.sql.functions.first
df.orderBy("level").groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)
You can define the order as well by using .asc for ascending and .desc for descending as below
df.orderBy($"level".asc).groupBy("item_id", "country_id").agg(first("level").as("level")).show(false)
And you can do the operation using window and row_number function too as below
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("item_id", "country_id").orderBy($"level".asc)
import org.apache.spark.sql.functions.row_number
df.withColumn("rank", row_number().over(windowSpec)).filter($"rank" === 1).drop("rank").show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With