I'm trying to convert my monte carlo simulation from numpy into dask, because sometimes the arrays are too large, and can't fit into the memory. Therefore I set up a cluster of computers in the cloud: My dask cluster consists of 24 cores and 94 GB of memory. I prepared a simplified version of my code for this question.
My original numpy code looks like this:
def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data
   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape
Note: I'm just returning here the shape of the numpy array (but as it is numpy the elements of future_panel are cumputed in memory.
Some words about the function:
historical_data - this is only 1Dhistorical_multidim). First two dimensions are not used here (but they are in my final application)
forecasted in the futurerandom_days_panel - is just an ndarray of randomly chosen days. So the final shape of this array is: 1, 1, sim_count, sim_days (explained in the previous point)future_panel is an ndarray with the randomly picked values from historical_multidim. I.e. an array generated from historical data having the expected shape (1, 1, sim_count, sim_days)Now, the problem is, that some of these steps are not implemented in dask:
historical_multidim[:, :, :, :] = historical_data - stack or
broadcast_to is recommended to usefuture_panel is not possible in daskSo I came out with this solution:
def dask_way_1d(sim_count, sim_days, hist_days):
    historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    def get_random_days_1d():
        return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
    future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
    future_panel =  da.stack(future_simulations)
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape
This solution works, however it is much much slower than the numpy solution. The problem is, that get_random_days_1d() returns a numpy array. I tried to use dask array, but get into an error when computing historical_data[get_random_days_1d()] -> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...
Another solution looks like this:
def dask_way_nd(sim_count, sim_days, hist_days):
    historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
    random_days_panel = np.random.randint(low=1,
                                          high=hist_days,
                                          size=(sim_count, sim_days))
    future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape
This solution stops on future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel] -> The error is: NotImplementedError: Don't yet support nd fancy indexing
So my question is, Is there any way to implement the same behavior as in my numpy code? But of course I want achhieve better performance (i.e. faster execution time)
Chunk random_days_panel instead of historical_data and use da.map_blocks:
def dask_way(sim_count, sim_days, hist_days):
    # shared historical data
    # on a cluster you'd load this on each worker, e.g. from a NPZ file
    historical_data = np.random.normal(111.51, 10, size=hist_days)
    random_days_panel = da.random.randint(
        1, hist_days, size=(1, 1, sim_count, sim_days)
    )
    future_panel = da.map_blocks(
        lambda chunk: historical_data[chunk], random_days_panel, dtype=float
    )
    future_panel.compute()
    return future_panel
This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:
hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)
# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With