Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting numpy solution into dask (numpy indexing doesn't work in dask)

I'm trying to convert my monte carlo simulation from numpy into dask, because sometimes the arrays are too large, and can't fit into the memory. Therefore I set up a cluster of computers in the cloud: My dask cluster consists of 24 cores and 94 GB of memory. I prepared a simplified version of my code for this question.

My original numpy code looks like this:

def numpy_way(sim_count, sim_days, hist_days):
   historical_data = np.random.normal(111.51, 10, hist_days)
   historical_multidim = np.empty(shape=(1, 1, sim_count, hist_days))
   historical_multidim[:, :, :, :] = historical_data


   random_days_panel = np.random.randint(low=1,
                                      high=hist_days,
                                      size=(1, 1, sim_count, sim_days))
   future_panel = historical_multidim[np.arange(1)[:, np.newaxis, np.newaxis, np.newaxis],
                                      np.arange(1)[:, np.newaxis, np.newaxis],
                                      np.arange(sim_count)[:, np.newaxis],
                                      random_days_panel]
   return future_panel.shape

Note: I'm just returning here the shape of the numpy array (but as it is numpy the elements of future_panel are cumputed in memory.

Some words about the function:

  • I'm creating a random array historical_data - this is only 1D
  • Then this array is 'broadcasted' to a 4D array (historical_multidim). First two dimensions are not used here (but they are in my final application)
    • 3rd dimension represents how many simulations are done
    • 4th dimension is the number of days forecasted in the future
  • random_days_panel - is just an ndarray of randomly chosen days. So the final shape of this array is: 1, 1, sim_count, sim_days (explained in the previous point)
  • future_panel is an ndarray with the randomly picked values from historical_multidim. I.e. an array generated from historical data having the expected shape (1, 1, sim_count, sim_days)

Now, the problem is, that some of these steps are not implemented in dask:

  • historical_multidim[:, :, :, :] = historical_data - stack or broadcast_to is recommended to use
  • the slicing used in future_panel is not possible in dask

So I came out with this solution:

def dask_way_1d(sim_count, sim_days, hist_days):
    historical_data = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    def get_random_days_1d():
        return np.random.randint(low=1, high=HIST_DAYS, size=sim_days)
    future_simulations = [historical_data[get_random_days_1d()] for _ in range(sim_count)]
    future_panel =  da.stack(future_simulations)
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

This solution works, however it is much much slower than the numpy solution. The problem is, that get_random_days_1d() returns a numpy array. I tried to use dask array, but get into an error when computing historical_data[get_random_days_1d()] -> KilledWorker: ("('normal-932553ab53ba4c7e908d61724430bbb2', 0)", ...

Another solution looks like this:

def dask_way_nd(sim_count, sim_days, hist_days):
    historical_data_1d = da.random.normal(111.51, 10, size=hist_days, chunks='auto')
    historical_data_2d = da.broadcast_to(historical_data_1d, shape=(sim_count, hist_days))
    random_days_panel = np.random.randint(low=1,
                                          high=hist_days,
                                          size=(sim_count, sim_days))
    future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel]
    future_panel = da.broadcast_to(future_panel, shape=(1, 1, sim_count, sim_days))
    future_panel.compute()
    return future_panel.shape

This solution stops on future_panel = historical_data_2d[np.arange(sim_count)[:, np.newaxis], random_days_panel] -> The error is: NotImplementedError: Don't yet support nd fancy indexing

So my question is, Is there any way to implement the same behavior as in my numpy code? But of course I want achhieve better performance (i.e. faster execution time)

like image 893
patex1987 Avatar asked Aug 23 '18 11:08

patex1987


1 Answers

Chunk random_days_panel instead of historical_data and use da.map_blocks:

def dask_way(sim_count, sim_days, hist_days):
    # shared historical data
    # on a cluster you'd load this on each worker, e.g. from a NPZ file
    historical_data = np.random.normal(111.51, 10, size=hist_days)

    random_days_panel = da.random.randint(
        1, hist_days, size=(1, 1, sim_count, sim_days)
    )
    future_panel = da.map_blocks(
        lambda chunk: historical_data[chunk], random_days_panel, dtype=float
    )

    future_panel.compute()

    return future_panel

This will delegate all the work to the workers and will become faster than pure numpy once your problem gets large enough to amortize the initial (constant) cost for bringing up the scheduler and distributing the context:

hist_days = int(1e6)
sim_days = int(1e5)
sim_count = int(1000)

# dask_way(sim_count, sim_days, hist_days)
# (code in this answer)
532 ms ± 53.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# numpy_way(sim_count, sim_days, hist_days)
# (code in the question)
4.47 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# dask_way_1d(sim_count, sim_days, hist_days)
# (code in the question)
5.76 s ± 91.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
like image 153
FirefoxMetzger Avatar answered Oct 25 '22 11:10

FirefoxMetzger