Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use window functions in PySpark using DataFrames?

Trying to figure out how to use window functions in PySpark. Here's an example of what I'd like to be able to do, simply count the number of times a user has an "event" (in this case "dt" is a simulated timestamp).

from pyspark.sql.window import Window
from pyspark.sql.functions import count

df = sqlContext.createDataFrame([{"id": 123, "dt": 0}, {"id": 123, "dt": 1}, {"id": 234, "dt":0}, {"id": 456, "dt":0}, {"id": 456, "dt":1}, {"id":456, "dt":2}])
df.select(["id","dt"], count("dt").over(Window.partitionBy("id").orderBy("dt")).alias("count")).show()

This produces an error. What is the correct way to use window functions? I read that 1.4.1 (the version we need to use since it's what is standard on AWS) should be able to do them with the DataFrame API.

FWIW, the documentation is pretty sparse on this subject. And I had trouble getting any examples actually running.

like image 789
Evan Zamir Avatar asked Sep 24 '15 19:09

Evan Zamir


People also ask

How does window function work in PySpark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

What is window unboundedPreceding PySpark?

unboundedPreceding , or any value less than or equal to -9223372036854775808. endint. boundary end, inclusive. The frame is unbounded if this is Window. unboundedFollowing , or any value greater than or equal to 9223372036854775807.

What is Ntile PySpark?

ntile (n)[source] Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.


1 Answers

It throws an exception because you pass a list of columns. Signature of DataFrame.select looks as follows

df.select(self, *cols)

and an expression using a window function is a column like any other so what you need here is something like this:

w = Window.partitionBy("id").orderBy("dt") # Just for clarity
df.select("id","dt", count("dt").over(w).alias("count")).show()

## +---+---+-----+
## | id| dt|count|
## +---+---+-----+
## |234|  0|    1|
## |456|  0|    1|
## |456|  1|    2|
## |456|  2|    3|
## |123|  0|    1|
## |123|  1|    2|
## +---+---+-----+

Generally speaking Spark SQL window functions behave exactly the same way as in any modern RDBMS.

like image 189
zero323 Avatar answered Sep 28 '22 02:09

zero323