i work with a large pyspark dataframe on a cluster and need to write a function that:
finds rows of consecutive zeros in a specific column and, if that streak is shorter than 300 rows, change them all to 1 and
then finds periods of consecutive ones in that column and, if that streak of ones is shorter than 1800 rows, set them all to 0.
Every row has a unique timestamp i can sort them by.
is there a way to make that happen?
Yes you can follow this example where I searched for strikes of less than 3 zeros and converted them to ones:
column = 'data'
date_column = 'timestamp'
min_consecutive_rows = 3
search_num = 0
set_to = 1
df = df.withColumn('binary', F.when(col(column)==search_num, 1).otherwise(0))\
.withColumn('start_streak', F.when(col('binary') != F.lead('binary', -1).over(w), 1).otherwise(0))\
.withColumn('streak_id', F.sum('start_streak').over(Window.orderBy(date_column)))\
.withColumn("streak_counter", F.row_number().over(Window.partitionBy("streak_id").orderBy(date_column)))\
.withColumn('max_streak_counter', F.max('streak_counter').over(Window.partitionBy("streak_id")))\
.withColumn(column, F.when((col('binary')==1) & (col('max_streak_counter') < min_consecutive_rows), set_to).otherwise(col(column)))
Suppose your data column is called data and your date column is called timestamp.
The performed steps are the following:
Here an example with all the intermediate columns:
| timestamp|data|binary|start_streak|streak_id|streak_counter|max_streak_counter|data_output|
+--------------------+----+------+------------+---------+--------------+------------------+-----------+
|2020-11-11 15:52:...| 1| 0| 0| 0| 1| 5| 1|
|2020-11-12 15:52:...| 2| 0| 0| 0| 2| 5| 2|
|2020-11-13 15:52:...| 3| 0| 0| 0| 3| 5| 3|
|2020-11-14 15:52:...| 4| 0| 0| 0| 4| 5| 4|
|2020-11-15 15:52:...| 1| 0| 0| 0| 5| 5| 1|
|2020-11-16 15:52:...| 0| 1| 1| 1| 1| 2| 1|
|2020-11-17 15:52:...| 0| 1| 0| 1| 2| 2| 1|
|2020-11-18 15:52:...| 1| 0| 1| 2| 1| 1| 1|
|2020-11-19 15:52:...| 0| 1| 1| 3| 1| 4| 0|
|2020-11-20 15:52:...| 0| 1| 0| 3| 2| 4| 0|
|2020-11-21 15:52:...| 0| 1| 0| 3| 3| 4| 0|
|2020-11-22 15:52:...| 0| 1| 0| 3| 4| 4| 0|
+--------------------+----+------+------------+---------+--------------+------------------+-----------+
For the second bullet point just change: column to 'data_output', min_consecutive_rows to 1800, search_num to 1, set_to parameter to 0 and repeat the code above.
For more details about the streak calcuation please visit this post that does a similar logic in pandas.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With