I see in this DataBricks post, there is support for window functions in SparkSql, in particular I'm trying to use the lag() window function.
I have rows of credit card transactions, and I've sorted them, now I want to iterate over the rows, and for each row display the amount of the transaction, and the difference of the current row's amount and the preceding row's amount.
Following the DataBricks post, I've come up with this query, but it's throwing an exception at me and I can't quite undestand why..
This is in PySpark.. tx is my dataframe already created at registered as a temp table.
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
and the exception (truncated)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
I'd really apprecaite any insight, this functionality is rather new and there's not a lot to go on as far as existing examples or other related posts.
Edit
I've also attempted to do this without SQL statement as below, but continue to get an error. I've used this with Hive and SQLContext, and receive the same error.
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
tx_df_ordered.select(
h_tx_df_ordered['cc_num'],
h_tx_df_ordered['trans_date'],
h_tx_df_ordered['trans_time'],
h_tx_df_ordered['amt'],
lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
File "rdd_raw_data.py", line 116, in <module>
lag_amt.alias("prev_amt")).show()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
ROWS
not ROW
Frame specification requires either lower bound value
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
or UNBOUNDED
keyword
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
LAG
function doesn't accept frame at all so a correct SQL query with lag can look like this
SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time
) as prev_amt from tx
Edit:
Regarding SQL DSL usage:
As you can read in an error message
Note that, using window functions currently requires a HiveContex
Be sure to initialize sqlContext
using HiveContext
not SQLContext
windowSpec.rowsBetween(-1, 0)
does nothing, but once again frame specification is not supported by the lag
function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With