Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

first_value windowing function in pyspark

I am using pyspark 1.5 getting my data from Hive tables and trying to use windowing functions.

According to this there exists an analytic function called firstValue that will give me the first non-null value for a given window. I know this exists in Hive but I can not find this in pyspark anywhere.

Is there a way to implement this given that pyspark won't allow UserDefinedAggregateFunctions (UDAFs)?

like image 230
liber Avatar asked Feb 01 '16 23:02

liber


People also ask

What does window function do in PySpark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

What is window Unboundedpreceding?

UNBOUNDED PRECEDING indicates that the window starts at the first row of the partition; offset PRECEDING indicates that the window starts a number of rows equivalent to the value of offset before the current row. UNBOUNDED PRECEDING is the default. CURRENT ROW indicates the window begins or ends at the current row.

How do you use lead function in PySpark?

Window function: returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition. This is equivalent to the LEAD function in SQL.

What is Ntile PySpark?

ntile (n)[source] Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.


1 Answers

Spark >= 2.0:

first takes an optional ignorenulls argument which can mimic the behavior of first_value:

df.select(col("k"), first("v", True).over(w).alias("fv"))

Spark < 2.0:

Available function is called first and can be used as follows:

df = sc.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3)
]).toDF(["k", "v"])

w = Window().partitionBy("k").orderBy("v")

df.select(col("k"), first("v").over(w).alias("fv"))

but if you want to ignore nulls you'll have to use Hive UDFs directly:

df.registerTempTable("df")

sqlContext.sql("""
    SELECT k, first_value(v, TRUE) OVER (PARTITION BY k ORDER BY v)
    FROM df""")
like image 180
zero323 Avatar answered Sep 19 '22 00:09

zero323