I'm trying to convert my monte carlo simulation from numpy
into dask
, because sometimes the arrays are too large, and can't fit into the memory. Therefore I set up a cluster of computers in the cloud: My dask cluster consists of 24 cores and 94 GB of memory. I prepared a simplified version of my code for this question.
My original numpy
code looks like this:
def numpy_way(sim_count, sim_days, hist_days):
historical_data = np.random.normal(111.51, 10, hist_days)
historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
historical_multidim[:, :, :, :] = historical_data
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(1, 1, sim_count, sim_days))
future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
np.arange(1)[:, np.newaxis, np.newaxis],
np.arange(sim_count)[:, np.newaxis],
random_days_panel]
return future_panel.shape
Note: I'm just returning here the shape of the numpy array (but as it is numpy the elements of future_panel are cumputed in memory.
Some words about the function:
historical_data
- this is only 1Dhistorical_multidim
). First two dimensions are not used here (but they are in my final application)
forecasted
in the futurerandom_days_panel
- is just an ndarray
of randomly chosen days. So the final shape
of this array is: 1, 1, sim_count, sim_days (explained in the previous point)future_panel
is an ndarray
with the randomly picked values from historical_multidim
. I.e. an array generated from historical data having the expected shape (1, 1, sim_count, sim_days)Now, the problem is, that some of these steps are not implemented in dask:
historical_multidim[:, :, :, :] = historical_data
- stack
or
broadcast_to
is recommended to usefuture_panel
is not possible in daskSo I came out with this solution:
def dask_way_1d(sim_count, sim_days, hist_days):
historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
def get_random_days_1d():
return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
future_panel = da.stack(future_simulations)
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
This solution works, however it is much much slower than the numpy solution. The problem is, that get_random_days_1d()
returns a numpy
array. I tried to use dask
array, but get into an error when computing historical_data[get_random_days_1d()]
-> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...
Another solution looks like this:
def dask_way_nd(sim_count, sim_days, hist_days):
historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
random_days_panel = np.random.randint(low=1,
high=hist_days,
size=(sim_count, sim_days))
future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
future_panel.compute()
return future_panel.shape
This solution stops on future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
-> The error is: NotImplementedError: Don't yet support nd fancy indexing
So my question is, Is there any way to implement the same behavior as in my numpy code? But of course I want achhieve better performance (i.e. faster execution time)
Chunk random_days_panel
instead of historical_data
and use da.map_blocks
:
def dask_way(sim_count, sim_days, hist_days):
# shared historical data
# on a cluster you'd load this on each worker, e.g. from a NPZ file
historical_data = np.random.normal(111.51, 10, size=hist_days)
random_days_panel = da.random.randint(
1, hist_days, size=(1, 1, sim_count, sim_days)
)
future_panel = da.map_blocks(
lambda chunk: historical_data[chunk], random_days_panel, dtype=float
)
future_panel.compute()
return future_panel
This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:
hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)
# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With