Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkSQL - Lag function?

I see in this DataBricks post, there is support for window functions in SparkSql, in particular I'm trying to use the lag() window function.

I have rows of credit card transactions, and I've sorted them, now I want to iterate over the rows, and for each row display the amount of the transaction, and the difference of the current row's amount and the preceding row's amount.

Following the DataBricks post, I've come up with this query, but it's throwing an exception at me and I can't quite undestand why..

This is in PySpark.. tx is my dataframe already created at registered as a temp table.

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

and the exception (truncated)..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

I'd really apprecaite any insight, this functionality is rather new and there's not a lot to go on as far as existing examples or other related posts.

Edit

I've also attempted to do this without SQL statement as below, but continue to get an error. I've used this with Hive and SQLContext, and receive the same error.

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
like image 413
nameBrandon Avatar asked Sep 14 '15 15:09

nameBrandon


1 Answers

  1. Frame specification should start with a keyword ROWS not ROW
  2. Frame specification requires either lower bound value

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    

    or UNBOUNDED keyword

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    
  3. LAG function doesn't accept frame at all so a correct SQL query with lag can look like this

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    

Edit:

Regarding SQL DSL usage:

  1. As you can read in an error message

    Note that, using window functions currently requires a HiveContex

    Be sure to initialize sqlContext using HiveContext not SQLContext

  2. windowSpec.rowsBetween(-1, 0) does nothing, but once again frame specification is not supported by the lag function.

like image 172
zero323 Avatar answered Oct 05 '22 11:10

zero323