I'm trying to use Spark 1.4 window functions in pyspark 1.4.1
but getting mostly errors or unexpected results. Here is a very simple example that I think should work:
from pyspark.sql.window import Window
import pyspark.sql.functions as func
l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()
[Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]
As you can see, if I add rowsBetween
frame specification, neither rank()
nor lag/lead()
window functions recognize it: "Window function does not take a frame specification".
If I omit the rowsBetween
frame specification at leas lag/lead()
do not throw exceptions but return unexpected (for me) result: always None
. And the rank()
still doesn't work with different exception.
Can anybody help me to get my window functions right?
UPDATE
All right, that starts to look as a pyspark bug. I have prepared the same test in pure Spark (Scala, spark-shell):
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;
val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])
Even though the rowsBetween
cannot be applied in Scala, both rank()
and lag()/lead()
work as I expect when rowsBetween
is omitted.
Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.
Using PARTITION BY to Define a Window Frame The window frame is a set of rows related to the current row where the window function is used for calculation. The window frame can be a different set of rows for the next row in the query result, since it depends on the current row being processed.
Because they operate over a "window frame" -- a set of rows relative to the current row, which can be specified with more precision using the ROWS or RANGE keyword.
Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org. apache.
As far as I can tell there two different problems. Window frame definition is simply not supported by Hive GenericUDAFRank
, GenericUDAFLag
and GenericUDAFLead
so errors you see are an expected behavior.
Regarding issue with the following PySpark code
wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
it looks like it is related to my question https://stackoverflow.com/q/31948194/1560062 and should be addressed by SPARK-9978. As far now you can make it work by changing window definition to this:
wSpec = Window.partitionBy().orderBy(df.a)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With