Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SPARK SQL Equivalent of Qualify + Row_number statements

Does anyone know the best way for Apache Spark SQL to achieve the same results as the standard SQL qualify() + rnk or row_number statements?

For example:

  • I have a Spark Dataframe called statement_data with 12 monthly records each for 100 unique account_numbers, therefore 1200 records in total
  • Each monthly record has a field called "statement_date" that can be used for determining the most recent record

I want my final result to be a new Spark Dataframe with the 3 most recent records (as determined by statement_date descending) for each of the 100 unique account_numbers, therefore 300 final records in total.

In standard Teradata SQL, I can do the following:

select * from statement_data
qualify row_number ()
over(partition by acct_id order by statement_date desc) <= 3

Apache Spark SQL does not have a standalone qualify function that I'm aware of, maybe I'm screwing up the syntax or can't find documentation that qualify exists.

It is fine if I need to do this in two steps as long as those two steps are:

  • A select query or alternative method to assign rank/row numbering for each account_number's records
  • A select query where I'm selecting all records with rank <= 3 (i.e. choose 1st, 2nd, and 3rd most recent records).

EDIT 1 - 7/23 2:09pm: The initial solution provided by zero323 was not working for me in Spark 1.4.1 with Spark SQL 1.4.1 dependency installed.

EDIT 2 - 7/23 3:24pm: It turns out the error was related to using SQL Context objects for my query instead of Hive Context. I am now able to run the below solution correctly after adding the following code to create and use a Hive Context:

final JavaSparkContext sc2;
final HiveContext hc2;
DataFrame df;
hc2 = TestHive$.MODULE$;
sc2 = new JavaSparkContext(hc2.sparkContext()); 
....
// Initial Spark/SQL contexts to set up Dataframes  
SparkConf conf = new SparkConf().setAppName("Statement Test");
...
DataFrame stmtSummary = 
    hc2.sql("SELECT * FROM (SELECT acct_id, stmt_end_dt, stmt_curr_bal, row_number() over (partition by acct_id order by stmt_curr_bal DESC) rank_num FROM stmt_data) tmp WHERE rank_num <= 3");
like image 500
Brian Correro Avatar asked Jul 21 '15 20:07

Brian Correro


People also ask

How do I assign row numbers in spark?

The row_number() is a window function in Spark SQL that assigns a row number (sequential integer number) to each row in the result DataFrame. This function is used with Window. partitionBy() which partitions the data into windows frames and orderBy() clause to sort the rows in each partition.

What is ROW_NUMBER () over partition by?

The Row_Number function is used to provide consecutive numbering of the rows in the result by the order selected in the OVER clause for each partition specified in the OVER clause. It will assign the value 1 for the first row and increase the number of the subsequent rows.

What is ROW_NUMBER () function in SQL?

ROW_NUMBER function is a SQL ranking function that assigns a sequential rank number to each new record in a partition. When the SQL Server ROW NUMBER function detects two identical values in the same partition, it assigns different rank numbers to both.

Can we use ROW_NUMBER without over?

The function 'ROW_NUMBER' must have an OVER clause with ORDER BY . If you do not want to order the result set and still want to generate the row numbers, then you can use a dummy sub query column inside the ORDER BY clause.


1 Answers

There is no qualify (it is usually useful to check parser source) but you can use subquery like this:

SELECT * FROM (
    SELECT *, row_number() OVER (
        PARTITION BY acct_id ORDER BY statement_date DESC
    ) rank FROM df
 ) tmp WHERE rank <= 3

See also SPARK : failure: ``union'' expected but `(' found

like image 123
zero323 Avatar answered Nov 03 '22 01:11

zero323