In PySpark, there's the concept of coalesce(colA, colB, ...)
which will, per row, take the first non-null value it encounters from those columns. However, I want coalesce(rowA, rowB, ...)
i.e. the ability to, per column, take the first non-null value it encounters from those rows. I want to coalesce all rows within a group or window of rows.
For example, given the following dataset, I want to coalesce rows per category and ordered ascending by date.
+---------+-----------+------+------+
| category| date| val1| val2|
+---------+-----------+------+------+
| A| 2020-05-01| null| 1|
| A| 2020-05-02| 2| null|
| A| 2020-05-03| 3| null|
| B| 2020-05-01| null| null|
| B| 2020-05-02| 4| null|
| C| 2020-05-01| 5| 2|
| C| 2020-05-02| null| 3|
| D| 2020-05-01| null| 4|
+---------+-----------+------+------+
What I should get as the output is...
+---------+-----------+------+------+
| category| date| val1| val2|
+---------+-----------+------+------+
| A| 2020-05-01| 2| 1|
| B| 2020-05-01| 4| null|
| C| 2020-05-01| 5| 2|
| D| 2020-05-01| null| 4|
+---------+-----------+------+------+
First, I'll give the answer. Then, I'll point out the important bits.
from pyspark.sql import Window
from pyspark.sql.functions import col, dense_rank, first
df = ... # dataframe from question description
window = (
Window
.partitionBy("category")
.orderBy(col("date").asc())
)
window_unbounded = (
window
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
cols_to_merge = [col for col in df.columns if col not in ["category", "date"]]
merged_cols = [first(col, True).over(window_unbounded).alias(col) for col in cols_to_merge]
df_merged = (
df
.select([col("category"), col("date")] + merged_cols)
.withColumn("rank_col", dense_rank().over(window))
.filter(col("rank_col") == 1)
.drop("rank_col")
)
The row-wise analogue to coalesce is the aggregation function first. Specifically, we use first with ignorenulls = True
so that we find the first non-null value.
When we use first, we have to be careful about the ordering of the rows it's applied to. Because groupBy doesn't allow us to maintain order within the groups, we use a Window.
The window itself must be unbounded on both ends rather than the default unbounded preceding to current row, else we'll end up with the first
aggregation potentially running on subsets of our groups.
After we aggregate over the window, we alias the column back to its original name to keep the column names consistent.
We use a single select statement of cols rather than a for loop with df.withColumn(col, ...)
because the select statement greatly reduces the query plan depth. Should you use the looped withColumn, you might hit a stack overflow error if you have too many columns.
Finally, we run a dense_rank
over our window --- this time using the window with the default range --- and filter to only the first ranked rows. We use dense rank here, but we could use any ranking function, whatever fits our needs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With