Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark / Spark Window Function First/ Last Issue

From my understanding first/ last function in Spark will retrieve first / last row of each partition/ I am not able to understand why LAST function is giving incorrect results.

This is my code.

AgeWindow = Window.partitionBy('Dept').orderBy('Age')
df1 = df1.withColumn('first(ID)', first('ID').over(AgeWindow))\
        .withColumn('last(ID)', last('ID').over(AgeWindow))           
df1.show()
+---+----------+---+--------+--------------------------+-------------------------+
|Age|      Dept| ID|    Name|first(ID)                 |last(ID)                |
+---+----------+---+--------+--------------------------+-------------------------+
| 38|  medicine|  4|   harry|                         4|                        4|
| 41|  medicine|  5|hermione|                         4|                        5|
| 55|  medicine|  7| gandalf|                         4|                        7|
| 15|technology|  6|  sirius|                         6|                        6|
| 49|technology|  9|     sam|                         6|                        9|
| 88|technology|  1|     sam|                         6|                        2|
| 88|technology|  2|     nik|                         6|                        2|
| 75|       mba|  8|   ginny|                         8|                       11|
| 75|       mba| 10|     sam|                         8|                       11|
| 75|       mba|  3|     ron|                         8|                       11|
| 75|       mba| 11|     ron|                         8|                       11|
+---+----------+---+--------+--------------------------+-------------------------+
like image 679
Nikhil Redij Avatar asked Sep 11 '18 09:09

Nikhil Redij


People also ask

What is F first in PySpark?

pyspark.sql.functions. first (col, ignorenulls=False)[source] Aggregate function: returns the first value in a group. The function by default returns the first values it sees.

How do you use last in PySpark?

Aggregate function: returns the last value in a group. The function by default returns the last values it sees. It will return the last non-null value it sees when ignoreNulls is set to true.

How does window work in PySpark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

What is first () in spark SQL?

The first() function returns the first element present in the column, when the ignoreNulls is set to True, it returns the first non-null element. The last() function returns the last element present in the column, when ignoreNulls is set to True, it further returns the last non-null element.


1 Answers

It is not incorrect. Your window definition is just not what you think it is.

If you provide ORDER BY clause then the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:

from pyspark.sql.window import Window
from pyspark.sql.functions import first, last

w = Window.partitionBy('Dept').orderBy('Age')

df = spark.createDataFrame(
    [(38, "medicine", 4), (41, "medicine", 5), (55, "medicine", 7)],
    ("Age", "Dept", "ID")
)

df.select(
    "*",
    first('ID').over(w).alias("first_id"), 
    last('ID').over(w).alias("last_id")
).explain()
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS first_id#38L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS last_id#40L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(Dept#23, 200)
      +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]

This means that the window function never looks ahead and the last row in the frame is the current row.

You should redefine the window as

w_uf = (Window
   .partitionBy('Dept')
   .orderBy('Age')
   .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))

result = df.select(
    "*", 
    first('ID').over(w_uf).alias("first_id"),
    last('ID').over(w_uf).alias("last_id")
)
== Physical Plan ==
Window [first(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS first_id#56L, last(ID#24L, false) windowspecdefinition(Dept#23, Age#22L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_id#58L], [Dept#23], [Age#22L ASC NULLS FIRST]
+- *(1) Sort [Dept#23 ASC NULLS FIRST, Age#22L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(Dept#23, 200)
      +- Scan ExistingRDD[Age#22L,Dept#23,ID#24L]
result.show()
+---+--------+---+--------+-------+
|Age|    Dept| ID|first_id|last_id|
+---+--------+---+--------+-------+
| 38|medicine|  4|       4|      7|
| 41|medicine|  5|       4|      7|
| 55|medicine|  7|       4|      7|
+---+--------+---+--------+-------+
like image 191
zero323 Avatar answered Oct 05 '22 17:10

zero323