Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: using udf within window

I need to detect threshold values on timeseries with Pyspark. On the example graph below I want to detect (by storing the associated timestamp) each occurrence of the parameter ALT_STD being larger than 5000 and then lower than 5000.

ALT_STD vs Time

For this simple case I can run simple queries such as

t_start = df.select('timestamp')\
                .filter(df.ALT_STD > 5000)\
                .sort('timestamp')\
                .first()
t_stop = df.select('timestamp')\
               .filter((df.ALT_STD < 5000)\                           
                       & (df.timestamp > t_start.timestamp))\
               .sort('timestamp')\
               .first()

However, in some cases, the event can by cyclic and I may have several curves (i.e. several times ALT_STD will raise above or below 5000). Of course, if I use the queries above I will only be able to detect the first occurrences.

I guess I should use window function with an udf, but I can't find a working solution. My guess is that the algorithm should be something like:

windowSpec = Window.partitionBy('flight_hash')\
                   .orderBy('timestamp')\
                   .rowsBetween(Window.currentRow, 1)

def detect_thresholds(x):
    if (x['ALT_STD'][current_row]< 5000) and (x['ALT_STD'][next_row] > 5000):
        return x['timestamp'] #Or maybe simply 1
    if (x['ALT_STD'][current_row]> 5000) and (x['ALT_STD'][current_row] > 5000):
    return x['timestamp'] #Or maybe simply 2
    else:
        return 0

import pyspark.sql.functions as F
detect_udf = F.udf(detect_threshold, IntegerType())
df.withColumn('Result', detect_udf(F.Struct('ALT_STD')).over(windowSpec).show()

Is such an algorithm feasible in Pyspark ? How ?

Post-scriptum: As a side note, I have understood how to use udf or udf and built-in sql window functions but not how to combine udf AND window. e.g. :

# This will compute the mean (built-in function)
df.withColumn("Result", F.mean(df['ALT_STD']).over(windowSpec)).show()

# This will also work
divide_udf = F.udf(lambda x: x[0]/1000., DoubleType())
df.withColumn('result', divide_udf(F.struct('timestamp')))
like image 724
Mike Avatar asked Jan 19 '26 14:01

Mike


1 Answers

No need for udf here (and python udfs cannot be used as window functions). Just use lead / lag with when:

from pyspark.sql.functions import col, lag, lead, when

result = (when((col('ALT_STD') < 5000) & (lead(col('ALT_STD'), 1) > 5000), 1)
    .when(col('ALT_STD') > 5000) & (lead(col('ALT_STD'), 1) < 5000), 1)
    .otherwise(0))

df.withColum("result", result)
like image 75
user9569772 Avatar answered Jan 23 '26 12:01

user9569772



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!