Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to efficiently parallelize time series forecasting using dask?

I'm trying to parallelize time series forecasting in python using dask. The format of the data is that each time series is a column and they have a common index of monthly dates. I have a custom forecasting function that returns a time series object with the fitted and forecasted values. I want to apply this function across all columns of a dataframe (all time series) and return a new dataframe with all these series to be uploaded to a DB. I've gotten the code to work by running:

data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func, 
    axis=0)).compute(get=dask.multiprocessing.get)

My question is, is there a way in Dask to partition by column instead of row, since in this use case I need to keep the ordered time index as is for the forecasting function to work correctly.

If not, how would I re-format the data to allow efficient large-scale forecasting to be possible, and still return the data in the format I need to then push to a DB?

example of data format

like image 578
Davis Avatar asked Mar 21 '18 21:03

Davis


2 Answers

Thanks for the help, i really appreciate it. I've used the dask.delayed solution and it's working really well, it takes about 1/3 of the time just using a local cluster.

For anyone interested the solution I've implemented:

from dask.distributed import Client, LocalCluster
import pandas as pd
import dask

cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)

#get list of time series back
output = []
for i in small_df:
    forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
    output.append(forecasted_series)

total = dask.delayed(output).compute()

#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()

This gives you the melted dataframe structure so if you want the series to be the columns you can transform it with

pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')

small_df is in this format in pandas dataframe with Date being the index

like image 176
Davis Avatar answered Oct 17 '22 00:10

Davis


Dask dataframe only partitions data by rows. See the Dask dataframe documentation

Dask array however can partition along any dimension. You have you use Numpy semantics though rather than Pandas semantics.

You can do anything you want to with dask delayed or futures. This parallel computing example given in a more generic tutorial might give you some ideas.

like image 43
MRocklin Avatar answered Oct 16 '22 22:10

MRocklin