Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Applying a function along an axis of a dask array

Tags:

python

dask

I'm analyzing ocean temperature data from a climate model simulation where the 4D data arrays (time, depth, latitude, longitude; denoted dask_array below) typically have a shape of (6000, 31, 189, 192) and a size of ~25GB (hence my desire to use dask; I've been getting memory errors trying to process these arrays using numpy).

I need to fit a cubic polynomial along the time axis at each level / latitude / longitude point and store the resulting 4 coefficients. I've therefore set chunksize=(6000, 1, 1, 1) so I have a separate chunk for each grid point.

This is my function for getting the coefficients of the cubic polynomial (the time_axis axis values are a global 1D numpy array defined elsewhere):

def my_polyfit(data):    
    return numpy.polyfit(data.squeeze(), time_axis, 3)

(So in this case, numpy.polyfit returns a list of length 4)

and this is the command I thought I'd need to apply it to each chunk:

dask_array.map_blocks(my_polyfit, chunks=(4, 1, 1, 1), drop_axis=0, new_axis=0).compute()

Whereby the time axis is now gone (hence drop_axis=0) and there's a new coefficient axis in it's place (of length 4).

When I run this command I get IndexError: tuple index out of range, so I'm wondering where/how I've misunderstood the use of map_blocks?

like image 348
Damien Irving Avatar asked Mar 29 '16 03:03

Damien Irving


2 Answers

I suspect that your experience will be smoother if your function returns an array of the same dimension that it consumes. E.g. you might consider defining your function as follows:

def my_polyfit(data):
    return np.polyfit(data.squeeze(), ...)[:, None, None, None]

Then you can probably ignore the new_axis, drop_axis bits.

Performance-wise you might also want to consider using a larger chunksize. At 6000 numbers per chunk you have over a million chunks, which means you'll probably spend more time in scheduling than in actual computation. Generally I shoot for chunks that are a few megabytes in size. Of course, increasing chunksize would cause your mapped function to become more complex.

Example

In [1]: import dask.array as da

In [2]: import numpy as np

In [3]: def f(b):
    return np.polyfit(b.squeeze(), np.arange(5), 3)[:, None, None, None]
   ...: 

In [4]: x = da.random.random((5, 3, 3, 3), chunks=(5, 1, 1, 1))

In [5]: x.map_blocks(f, chunks=(4, 1, 1, 1)).compute()
Out[5]: 
array([[[[ -1.29058580e+02,   2.21410738e+02,   1.00721521e+01],
         [ -2.22469851e+02,  -9.14889627e+01,  -2.86405832e+02],
         [  1.40415805e+02,   3.58726232e+02,   6.47166710e+02]],
         ...
like image 120
MRocklin Avatar answered Oct 19 '22 07:10

MRocklin


Kind of late to the party, but figured this could use an alternative answer based on new features in Dask. In particular, we added apply_along_axis, which behaves basically like NumPy's apply_along_axis except for Dask Arrays instead. This results in somewhat simpler syntax. Also it avoids the need to rechunk your data before applying your custom function to each 1-D piece and makes no real requirements of your initial chunking, which it tries to preserve in the end result (excepting the axis that is either reduced or replaced).

In [1]: import dask.array as da

In [2]: import numpy as np

In [3]: def f(b):
   ...:     return np.polyfit(b, np.arange(len(b)), 3)
   ...: 

In [4]: x = da.random.random((5, 3, 3, 3), chunks=(5, 1, 1, 1))

In [5]: da.apply_along_axis(f, 0, x).compute()
Out[5]: 
array([[[[  2.13570599e+02,   2.28924503e+00,   6.16369231e+01],
         [  4.32000311e+00,   7.01462518e+01,  -1.62215514e+02],
         [  2.89466687e+02,  -1.35522215e+02,   2.86643721e+02]],
         ...
like image 5
jakirkham Avatar answered Oct 19 '22 07:10

jakirkham