I am doing group by action in spark sql.In that some rows contain same value with different ID.In that case I want to select first row.
This is my code.
val highvalueresult = highvalue.select($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID", $"RSSI_Weight_avg")
.groupBy("tagShortID", "Timestamp").agg(max($"RSSI_Weight_avg")
.alias("RSSI_Weight_avg"))
val t2 = averageDF.join(highvalueresult, Seq("tagShortID", "Timestamp", "RSSI_Weight_avg"))
And this is my result.
tag,timestamp,rssi,listner,rootorg,suborg
2,1496745906,0.7,3878,4,3
4,1496745907,0.6,362,4,3
4,1496745907,0.6,718,4,3
4,1496745907,0.6,1901,4,3
In the above result for the time stamp 1496745907 same rssi values for three listner.In this case I want to select the first row.
➠ Find complete row duplicates: GroupBy can be used along with count() aggregate function on all the columns (using df. ➠ Find column level duplicates: GroupBy with required columns can be used along with count() aggregate function and then filter can be used to get duplicate records.
To do that, you can use the ROW_NUMBER() function. In OVER() , you specify the groups into which the rows should be divided ( PARTITION BY ) and the order in which the numbers should be assigned to the rows ( ORDER BY ). You assign the row numbers within each group (i.e., year).
In Spark, the First function always returns the first element of the dataset. It is similar to take(1).
Using the PySpark filter(), just select row == 1, which returns just the first row of each group.
You can use the windowing functions support that spark sql context has Assuming you dataframe is:
+---+----------+----+-------+-------+------+
|tag| timestamp|rssi|listner|rootorg|suborg|
+---+----------+----+-------+-------+------+
| 2|1496745906| 0.7| 3878| 4| 3|
| 4|1496745907| 0.6| 362| 4| 3|
| 4|1496745907| 0.6| 718| 4| 3|
| 4|1496745907| 0.6| 1901| 4| 3|
+---+----------+----+-------+-------+------+
Define a window function as(you can partition by/order by your columns):
val window = Window.partitionBy("timestamp", "rssi").orderBy("timestamp")
Apply the window function:
res1.withColumn("rank", row_number().over(window))
+---+----------+----+-------+-------+------+----+
|tag| timestamp|rssi|listner|rootorg|suborg|rank|
+---+----------+----+-------+-------+------+----+
| 4|1496745907| 0.6| 362| 4| 3| 1|
| 4|1496745907| 0.6| 718| 4| 3| 2|
| 4|1496745907| 0.6| 1901| 4| 3| 3|
| 2|1496745906| 0.7| 3878| 4| 3| 1|
+---+----------+----+-------+-------+------+----+
Select the first rows from each window
res5.where($"rank" === 1)
+---+----------+----+-------+-------+------+----+
|tag| timestamp|rssi|listner|rootorg|suborg|rank|
+---+----------+----+-------+-------+------+----+
| 4|1496745907| 0.6| 362| 4| 3| 1|
| 2|1496745906| 0.7| 3878| 4| 3| 1|
+---+----------+----+-------+-------+------+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With