Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: count number of consecutive ones/zeros and change them if streak is to short / to long

i work with a large pyspark dataframe on a cluster and need to write a function that:

  1. finds rows of consecutive zeros in a specific column and, if that streak is shorter than 300 rows, change them all to 1 and

  2. then finds periods of consecutive ones in that column and, if that streak of ones is shorter than 1800 rows, set them all to 0.

Every row has a unique timestamp i can sort them by.

is there a way to make that happen?

like image 260
volkerracho Avatar asked Dec 15 '25 07:12

volkerracho


1 Answers

Yes you can follow this example where I searched for strikes of less than 3 zeros and converted them to ones:

column = 'data'
date_column = 'timestamp'
min_consecutive_rows = 3
search_num = 0
set_to = 1

df = df.withColumn('binary', F.when(col(column)==search_num, 1).otherwise(0))\
.withColumn('start_streak', F.when(col('binary') != F.lead('binary', -1).over(w), 1).otherwise(0))\
.withColumn('streak_id', F.sum('start_streak').over(Window.orderBy(date_column)))\
.withColumn("streak_counter", F.row_number().over(Window.partitionBy("streak_id").orderBy(date_column)))\
.withColumn('max_streak_counter', F.max('streak_counter').over(Window.partitionBy("streak_id")))\
.withColumn(column, F.when((col('binary')==1) & (col('max_streak_counter') < min_consecutive_rows), set_to).otherwise(col(column)))

Suppose your data column is called data and your date column is called timestamp.

The performed steps are the following:

  1. binary column is used to search only streaks of the desired search_num number. It allows your data to have other numbers rather than only zeros and ones, still searching only streaks of zeros in this case.
  2. start_streak tell us which rows are the start of a new streak
  3. streak_id creates a unique ID for each streak
  4. streak_counter counts the elements on each streak
  5. max_streak_counter tell us the maximum counter of elements for each streak_id
  6. Finally data_output converts the numbers only if the streak is less than min_consecutive_rows parameter and it is composed by the requested search_num numbers (zeros in this case)

Here an example with all the intermediate columns:

|           timestamp|data|binary|start_streak|streak_id|streak_counter|max_streak_counter|data_output|
+--------------------+----+------+------------+---------+--------------+------------------+-----------+
|2020-11-11 15:52:...|   1|     0|           0|        0|             1|                 5|          1|
|2020-11-12 15:52:...|   2|     0|           0|        0|             2|                 5|          2|
|2020-11-13 15:52:...|   3|     0|           0|        0|             3|                 5|          3|
|2020-11-14 15:52:...|   4|     0|           0|        0|             4|                 5|          4|
|2020-11-15 15:52:...|   1|     0|           0|        0|             5|                 5|          1|
|2020-11-16 15:52:...|   0|     1|           1|        1|             1|                 2|          1|
|2020-11-17 15:52:...|   0|     1|           0|        1|             2|                 2|          1|
|2020-11-18 15:52:...|   1|     0|           1|        2|             1|                 1|          1|
|2020-11-19 15:52:...|   0|     1|           1|        3|             1|                 4|          0|
|2020-11-20 15:52:...|   0|     1|           0|        3|             2|                 4|          0|
|2020-11-21 15:52:...|   0|     1|           0|        3|             3|                 4|          0|
|2020-11-22 15:52:...|   0|     1|           0|        3|             4|                 4|          0|
+--------------------+----+------+------------+---------+--------------+------------------+-----------+

For the second bullet point just change: column to 'data_output', min_consecutive_rows to 1800, search_num to 1, set_to parameter to 0 and repeat the code above.

For more details about the streak calcuation please visit this post that does a similar logic in pandas.

like image 135
Davide Anghileri Avatar answered Dec 16 '25 22:12

Davide Anghileri



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!