I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.
# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())
series_list = ['0620', '5914']
SeriesAppend=[]
for item in series_list:
# Filter for select item
series = test_df.where(col("ID").isin([item]))
# Sort time series
series_sorted = series.sort(series.ID,
series.date).persist()
# Calculate rolling median
series_sorted = series_sorted.withColumn("list",
collect_list("metric").over(w)) \
.withColumn("rolling_median", median_udf("list"))
SeriesAppend.append(series_sorted)
SeriesAppend
[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]
When I attempt to .show():
'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'
I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?
I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:
from functools import reduce
from pyspark.sql import DataFrame
dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)
Is there a way to generalize this to non-explicit dataframe names?
collect() This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the iterator to iterate columns.
Thanks everyone! To sum up - the solution uses Reduce and unionAll:
from functools import reduce
from pyspark.sql import DataFrame
SeriesAppend=[]
for item in series_list:
# Filter for select item
series = test_df.where(col("ID").isin([item]))
# Sort time series
series_sorted = series.sort(series.ID,
series.date).persist()
# Calculate rolling median
series_sorted = series_sorted.withColumn("list",
collect_list("metric").over(w)) \
.withColumn("rolling_median", median_udf("list"))
SeriesAppend.append(series_sorted)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With