Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: How to Append Dataframes in For Loop

I am performing a rolling median calculation on individual time series dataframes, then I want to concat/append the results.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float], DataFrame[ntwrk_genre_cd: string, date: date, mkt_cd: string, syscode: string, ntwrk_cd: string, syscode_ntwrk: string, metric: double, list: array, rolling_median: float]]

When I attempt to .show():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

I realize this is saying the object is a list of dataframes. How do I convert to a single dataframe?

I know that the following solution works for an explicit number of dataframes, but I want my for-loop to be agnostic to the number of dataframes:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Is there a way to generalize this to non-explicit dataframe names?

like image 744
mwhee Avatar asked May 29 '19 15:05

mwhee


People also ask

Can we use for loop in PySpark DataFrame?

collect() This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the iterator to iterate columns.


1 Answers

Thanks everyone! To sum up - the solution uses Reduce and unionAll:

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)
like image 183
mwhee Avatar answered Nov 15 '22 10:11

mwhee