Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why do Window functions fail with "Window function X does not take a frame specification"?

I'm trying to use Spark 1.4 window functions in pyspark 1.4.1

but getting mostly errors or unexpected results. Here is a very simple example that I think should work:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])

wSpec = Window.orderBy(df.a).rowsBetween(-1,1)

df.select(df.a, func.rank().over(wSpec).alias("rank"))  
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next"))  
    ===>  org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


wSpec = Window.orderBy(df.a)

df.select(df.a, func.rank().over(wSpec).alias("rank"))
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect()

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)]

As you can see, if I add rowsBetween frame specification, neither rank() nor lag/lead() window functions recognize it: "Window function does not take a frame specification".

If I omit the rowsBetween frame specification at leas lag/lead() do not throw exceptions but return unexpected (for me) result: always None. And the rank() still doesn't work with different exception.

Can anybody help me to get my window functions right?

UPDATE

All right, that starts to look as a pyspark bug. I have prepared the same test in pure Spark (Scala, spark-shell):

import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505))
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2))
val schemaString = "a b"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true)))
val df = sqlContext.createDataFrame(rdd, schema)

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val wSpec = Window.orderBy("a").rowsBetween(-1,1)
df.select(df("a"), rank().over(wSpec).alias("rank"))
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.;

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.;


val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5])

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next"))
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null])

Even though the rowsBetween cannot be applied in Scala, both rank() and lag()/lead() work as I expect when rowsBetween is omitted.

like image 823
Sergey Shcherbakov Avatar asked Sep 03 '15 13:09

Sergey Shcherbakov


People also ask

Does spark SQL support window functions?

Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.

Which function is associated with window frame?

Using PARTITION BY to Define a Window Frame The window frame is a set of rows related to the current row where the window function is used for calculation. The window frame can be a different set of rows for the next row in the query result, since it depends on the current row being processed.

Why are window functions called window functions?

Because they operate over a "window frame" -- a set of rows relative to the current row, which can be specified with more precision using the ROWS or RANGE keyword.

How does window function work in spark?

Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org. apache.


1 Answers

As far as I can tell there two different problems. Window frame definition is simply not supported by Hive GenericUDAFRank, GenericUDAFLag and GenericUDAFLead so errors you see are an expected behavior.

Regarding issue with the following PySpark code

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))

it looks like it is related to my question https://stackoverflow.com/q/31948194/1560062 and should be addressed by SPARK-9978. As far now you can make it work by changing window definition to this:

wSpec = Window.partitionBy().orderBy(df.a)
like image 121
zero323 Avatar answered Sep 29 '22 01:09

zero323