Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How orderBy affects Window.partitionBy in Pyspark dataframe?

I explain my question through an example:
Let us assume we have a dataframe as follows:

original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()

Output:

+---+-----+
|key|price|
+---+-----+
|  x|   10|
|  x|   15|
|  x|   10|
|  x|   25|
|  y|   20|
|  y|   10|
|  y|   20|
+---+-----+

And assume I want to get a list of prices for each key using window:

w = Window.partitionBy('key')
original_df.withColumn('price_list', F.collect_list('price').over(w)).show()

Output:

+---+-----+----------------+
|key|price|      price_list|
+---+-----+----------------+
|  x|   10|[10, 15, 10, 25]|
|  x|   15|[10, 15, 10, 25]|
|  x|   10|[10, 15, 10, 25]|
|  x|   25|[10, 15, 10, 25]|
|  y|   20|    [20, 10, 20]|
|  y|   10|    [20, 10, 20]|
|  y|   20|    [20, 10, 20]|
+---+-----+----------------+

So far so good.
But if I want to get an ordered list, and I add orderBy to my window w I get:

w = Window.partitionBy('key').orderBy('price')
original_df.withColumn('ordered_list', F.collect_list('price').over(w)).show()

Output:

+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|        [10, 10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|    [10, 20, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

Which means orderBy (kind of) changed the rows (same as what rowsBetween does) in the window as well! Which it's not supposed to do.

Eventhough I can fix it by specifying rowsBetween in the window and get the expected results,

w = Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

can someone explain why orderBy affects window in that way?

like image 585
Ala Tarighati Avatar asked Dec 06 '18 08:12

Ala Tarighati


People also ask

How does window work in PySpark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

What is orderBy in PySpark?

PySpark orderby is a spark sorting function used to sort the data frame / RDD in a PySpark Framework. It is used to sort one more column in a PySpark Data Frame. The Desc method is used to order the elements in descending order.

How does lag work in PySpark?

The LAG function in PySpark allows the user to query on more than one row of a table returning the previous row in the table. The function uses the offset value that compares the data to be used from the current row and the result is then returned if the value is true.

What is rank function in PySpark?

pyspark.sql.functions. rank ()[source] Window function: returns the rank of rows within a window partition. The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties.


1 Answers

Spark Window are specified using three parts: partition, order and frame.

  1. When none of the parts are specified then whole dataset would be considered as a single window.
  2. When partition is specified using a column, one window per distinct value of the column is created. If only partition is specified, then when a when is evaluated for a row, all the rows in that partition would taken into account. Thats why you see all 4 values [10, 15, 10, 25] for all the rows in partition x.
  3. When partition and ordering is specified, then when row function is evaluated it takes the rank order of rows in partition and all the rows which has same or lower value (if default asc order is specified) rank are included. In your case, first row includes [10,10] because there 2 rows in the partition with the same rank.
  4. When Frame specification rowsBetween and rangeBetween are specified, then row evaluation would pick only those rows which matched frame rule. e.g. unbounded and currentRow is specified it would pick current row and all rows that occur before it. If orderBy is specified, it would change which rows occur before the current row accordingly.

specifically to your question, orderBy is not only to sort the partitioned data but it also change the row frame selection

Below are different windowspec and the corresponding output

Window.orderBy()
+---+-----+----------------------------+
|key|price|price_list                  |
+---+-----+----------------------------+
|x  |15   |[15, 10, 10, 20, 10, 25, 20]|
|x  |10   |[15, 10, 10, 20, 10, 25, 20]|
|y  |10   |[15, 10, 10, 20, 10, 25, 20]|
|y  |20   |[15, 10, 10, 20, 10, 25, 20]|
|x  |10   |[15, 10, 10, 20, 10, 25, 20]|
|x  |25   |[15, 10, 10, 20, 10, 25, 20]|
|y  |20   |[15, 10, 10, 20, 10, 25, 20]|
+---+-----+----------------------------+

Window.partitionBy('key')
+---+-----+----------------+
|key|price|      price_list|
+---+-----+----------------+
|  x|   15|[15, 10, 10, 25]|
|  x|   10|[15, 10, 10, 25]|
|  x|   10|[15, 10, 10, 25]|
|  x|   25|[15, 10, 10, 25]|
|  y|   20|    [20, 10, 20]|
|  y|   10|    [20, 10, 20]|
|  y|   20|    [20, 10, 20]|
+---+-----+----------------+

Window.partitionBy('key').orderBy('price')
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|        [10, 10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|    [10, 20, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

w = Window.partitionBy('key').orderBy(F.desc('price'))
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   25|            [25]|
|  x|   15|        [25, 15]|
|  x|   10|[25, 15, 10, 10]|
|  x|   10|[25, 15, 10, 10]|
|  y|   20|        [20, 20]|
|  y|   20|        [20, 20]|
|  y|   10|    [20, 20, 10]|
+---+-----+----------------+

Window.partitionBy('key').orderBy('price').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   10|            [10]|
|  x|   10|        [10, 10]|
|  x|   15|    [10, 10, 15]|
|  x|   25|[10, 10, 15, 25]|
|  y|   10|            [10]|
|  y|   20|        [10, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+

Window.partitionBy('key').rowsBetween(Window.unboundedPreceding, Window.currentRow)
+---+-----+----------------+
|key|price|    ordered_list|
+---+-----+----------------+
|  x|   15|            [15]|
|  x|   10|        [15, 10]|
|  x|   10|    [15, 10, 10]|
|  x|   25|[15, 10, 10, 25]|
|  y|   10|            [10]|
|  y|   20|        [10, 20]|
|  y|   20|    [10, 20, 20]|
+---+-----+----------------+
like image 157
Manoj Singh Avatar answered Oct 29 '22 15:10

Manoj Singh