I'm trying to parallelize time series forecasting in python using dask. The format of the data is that each time series is a column and they have a common index of monthly dates. I have a custom forecasting function that returns a time series object with the fitted and forecasted values. I want to apply this function across all columns of a dataframe (all time series) and return a new dataframe with all these series to be uploaded to a DB. I've gotten the code to work by running:
data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func,
axis=0)).compute(get=dask.multiprocessing.get)
My question is, is there a way in Dask to partition by column instead of row, since in this use case I need to keep the ordered time index as is for the forecasting function to work correctly.
If not, how would I re-format the data to allow efficient large-scale forecasting to be possible, and still return the data in the format I need to then push to a DB?
example of data format
Thanks for the help, i really appreciate it. I've used the dask.delayed solution and it's working really well, it takes about 1/3 of the time just using a local cluster.
For anyone interested the solution I've implemented:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask
cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)
#get list of time series back
output = []
for i in small_df:
forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
output.append(forecasted_series)
total = dask.delayed(output).compute()
#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()
This gives you the melted dataframe structure so if you want the series to be the columns you can transform it with
pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')
small_df is in this format in pandas dataframe with Date being the index
Dask dataframe only partitions data by rows. See the Dask dataframe documentation
Dask array however can partition along any dimension. You have you use Numpy semantics though rather than Pandas semantics.
You can do anything you want to with dask delayed or futures. This parallel computing example given in a more generic tutorial might give you some ideas.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With