From my understanding first/ last function in Spark will retrieve first / last row of each partition/ I am not able to understand why LAST function is giving incorrect results.
This is my code.
AgeWindow = Window.partitionBy('Dept').orderBy('Age')
df1 = df1.withColumn('first(ID)', first('ID').over(AgeWindow))\
.withColumn('last(ID)', last('ID').over(AgeWindow))
df1.show()
+---+----------+---+--------+--------------------------+-------------------------+
|Age| Dept| ID| Name|first(ID) |last(ID) |
+---+----------+---+--------+--------------------------+-------------------------+
| 38| medicine| 4| harry| 4| 4|
| 41| medicine| 5|hermione| 4| 5|
| 55| medicine| 7| gandalf| 4| 7|
| 15|technology| 6| sirius| 6| 6|
| 49|technology| 9| sam| 6| 9|
| 88|technology| 1| sam| 6| 2|
| 88|technology| 2| nik| 6| 2|
| 75| mba| 8| ginny| 8| 11|
| 75| mba| 10| sam| 8| 11|
| 75| mba| 3| ron| 8| 11|
| 75| mba| 11| ron| 8| 11|
+---+----------+---+--------+--------------------------+-------------------------+
pyspark.sql.functions. first (col, ignorenulls=False)[source] Aggregate function: returns the first value in a group. The function by default returns the first values it sees.
Aggregate function: returns the last value in a group. The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true.
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.
The first() function returns the first element present in the column, when the ignoreNulls is set to True, it returns the first non-null element. The last() function returns the last element present in the column, when ignoreNulls is set to True, it further returns the last non-null element.
It is not incorrect. Your window definition is just not what you think it is.
If you provide ORDER BY
clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
:
from pyspark.sql.window import Window
from pyspark.sql.functions import first, last
w = Window.partitionBy('Dept').orderBy('Age')
df = spark.createDataFrame(
[(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],
("Age", "Dept", "ID")
)
df.select(
"*",
first('ID').over(w).alias("first_id"),
last('ID').over(w).alias("last_id")
).explain()
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
This means that the window function never looks ahead and the last row in the frame is the current row.
You should redefine the window as
w_uf = (Window
.partitionBy('Dept')
.orderBy('Age')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
result = df.select(
"*",
first('ID').over(w_uf).alias("first_id"),
last('ID').over(w_uf).alias("last_id")
)
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Dept#23, 200)
+- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+
|Age| Dept| ID|first_id|last_id|
+---+--------+---+--------+-------+
| 38|medicine| 4| 4| 7|
| 41|medicine| 5| 4| 7|
| 55|medicine| 7| 4| 7|
+---+--------+---+--------+-------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With