I am using pyspark 1.5 getting my data from Hive tables and trying to use windowing functions.
According to this there exists an analytic function called firstValue
that will give me the first non-null value for a given window. I know this exists in Hive but I can not find this in pyspark anywhere.
Is there a way to implement this given that pyspark won't allow UserDefinedAggregateFunctions (UDAFs)?
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.
UNBOUNDED PRECEDING indicates that the window starts at the first row of the partition; offset PRECEDING indicates that the window starts a number of rows equivalent to the value of offset before the current row. UNBOUNDED PRECEDING is the default. CURRENT ROW indicates the window begins or ends at the current row.
Window function: returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition. This is equivalent to the LEAD function in SQL.
ntile (n)[source] Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition. For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
Spark >= 2.0:
first
takes an optional ignorenulls
argument which can mimic the behavior of first_value
:
df.select(col("k"), first("v", True).over(w).alias("fv"))
Spark < 2.0:
Available function is called first
and can be used as follows:
df = sc.parallelize([
("a", None), ("a", 1), ("a", -1), ("b", 3)
]).toDF(["k", "v"])
w = Window().partitionBy("k").orderBy("v")
df.select(col("k"), first("v").over(w).alias("fv"))
but if you want to ignore nulls you'll have to use Hive UDFs directly:
df.registerTempTable("df")
sqlContext.sql("""
SELECT k, first_value(v, TRUE) OVER (PARTITION BY k ORDER BY v)
FROM df""")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With