I explain my question through an example:
Let us assume we have a dataframe as follows:
original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()
Output:
+---+-----+
|key|price|
+---+-----+
| x| 10|
| x| 15|
| x| 10|
| x| 25|
| y| 20|
| y| 10|
| y| 20|
+---+-----+
And assume I want to get a list of prices
for each key
using window
:
w = Window.partitionBy('key')
original_df.withColumn('price_list', F.collect_list('price').over(w)).show()
Output:
+---+-----+----------------+
|key|price| price_list|
+---+-----+----------------+
| x| 10|[10, 15, 10, 25]|
| x| 15|[10, 15, 10, 25]|
| x| 10|[10, 15, 10, 25]|
| x| 25|[10, 15, 10, 25]|
| y| 20| [20, 10, 20]|
| y| 10| [20, 10, 20]|
| y| 20| [20, 10, 20]|
+---+-----+----------------+
So far so good.
But if I want to get an ordered list, and I add orderBy
to my window w
I get:
w = Window.partitionBy('key').orderBy('price')
original_df.withColumn('ordered_list', F.collect_list('price').over(w)).show()
Output:
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10, 10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
Which means orderBy
(kind of) changed the rows (same as what rowsBetween
does) in the window as well! Which it's not supposed to do.
Eventhough I can fix it by specifying rowsBetween
in the window and get the expected results,
w = Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
can someone explain why orderBy
affects window
in that way?
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.
PySpark orderby is a spark sorting function used to sort the data frame / RDD in a PySpark Framework. It is used to sort one more column in a PySpark Data Frame. The Desc method is used to order the elements in descending order.
The LAG function in PySpark allows the user to query on more than one row of a table returning the previous row in the table. The function uses the offset value that compares the data to be used from the current row and the result is then returned if the value is true.
pyspark.sql.functions. rank ()[source] Window function: returns the rank of rows within a window partition. The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties.
Spark Window are specified using three parts: partition, order and frame.
specifically to your question, orderBy is not only to sort the partitioned data but it also change the row frame selection
Below are different windowspec and the corresponding output
Window.orderBy()
+---+-----+----------------------------+
|key|price|price_list |
+---+-----+----------------------------+
|x |15 |[15, 10, 10, 20, 10, 25, 20]|
|x |10 |[15, 10, 10, 20, 10, 25, 20]|
|y |10 |[15, 10, 10, 20, 10, 25, 20]|
|y |20 |[15, 10, 10, 20, 10, 25, 20]|
|x |10 |[15, 10, 10, 20, 10, 25, 20]|
|x |25 |[15, 10, 10, 20, 10, 25, 20]|
|y |20 |[15, 10, 10, 20, 10, 25, 20]|
+---+-----+----------------------------+
Window.partitionBy('key')
+---+-----+----------------+
|key|price| price_list|
+---+-----+----------------+
| x| 15|[15, 10, 10, 25]|
| x| 10|[15, 10, 10, 25]|
| x| 10|[15, 10, 10, 25]|
| x| 25|[15, 10, 10, 25]|
| y| 20| [20, 10, 20]|
| y| 10| [20, 10, 20]|
| y| 20| [20, 10, 20]|
+---+-----+----------------+
Window.partitionBy('key').orderBy('price')
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10, 10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
w = Window.partitionBy('key').orderBy(F.desc('price'))
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 25| [25]|
| x| 15| [25, 15]|
| x| 10|[25, 15, 10, 10]|
| x| 10|[25, 15, 10, 10]|
| y| 20| [20, 20]|
| y| 20| [20, 20]|
| y| 10| [20, 20, 10]|
+---+-----+----------------+
Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 10| [10]|
| x| 10| [10, 10]|
| x| 15| [10, 10, 15]|
| x| 25|[10, 10, 15, 25]|
| y| 10| [10]|
| y| 20| [10, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
Window.partitionBy('key').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price| ordered_list|
+---+-----+----------------+
| x| 15| [15]|
| x| 10| [15, 10]|
| x| 10| [15, 10, 10]|
| x| 25|[15, 10, 10, 25]|
| y| 10| [10]|
| y| 20| [10, 20]|
| y| 20| [10, 20, 20]|
+---+-----+----------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With