I'm analyzing ocean temperature data from a climate model simulation where the 4D data arrays (time, depth, latitude, longitude; denoted dask_array
below) typically have a shape of (6000, 31, 189, 192) and a size of ~25GB (hence my desire to use dask; I've been getting memory errors trying to process these arrays using numpy).
I need to fit a cubic polynomial along the time axis at each level / latitude / longitude point and store the resulting 4 coefficients. I've therefore set chunksize=(6000, 1, 1, 1)
so I have a separate chunk for each grid point.
This is my function for getting the coefficients of the cubic polynomial (the time_axis
axis values are a global 1D numpy array defined elsewhere):
def my_polyfit(data):
return numpy.polyfit(data.squeeze(), time_axis, 3)
(So in this case, numpy.polyfit
returns a list of length 4)
and this is the command I thought I'd need to apply it to each chunk:
dask_array.map_blocks(my_polyfit, chunks=(4, 1, 1, 1), drop_axis=0, new_axis=0).compute()
Whereby the time axis is now gone (hence drop_axis=0
) and there's a new coefficient axis in it's place (of length 4).
When I run this command I get IndexError: tuple index out of range
, so I'm wondering where/how I've misunderstood the use of map_blocks
?
I suspect that your experience will be smoother if your function returns an array of the same dimension that it consumes. E.g. you might consider defining your function as follows:
def my_polyfit(data):
return np.polyfit(data.squeeze(), ...)[:, None, None, None]
Then you can probably ignore the new_axis
, drop_axis
bits.
Performance-wise you might also want to consider using a larger chunksize. At 6000 numbers per chunk you have over a million chunks, which means you'll probably spend more time in scheduling than in actual computation. Generally I shoot for chunks that are a few megabytes in size. Of course, increasing chunksize would cause your mapped function to become more complex.
In [1]: import dask.array as da
In [2]: import numpy as np
In [3]: def f(b):
return np.polyfit(b.squeeze(), np.arange(5), 3)[:, None, None, None]
...:
In [4]: x = da.random.random((5, 3, 3, 3), chunks=(5, 1, 1, 1))
In [5]: x.map_blocks(f, chunks=(4, 1, 1, 1)).compute()
Out[5]:
array([[[[ -1.29058580e+02, 2.21410738e+02, 1.00721521e+01],
[ -2.22469851e+02, -9.14889627e+01, -2.86405832e+02],
[ 1.40415805e+02, 3.58726232e+02, 6.47166710e+02]],
...
Kind of late to the party, but figured this could use an alternative answer based on new features in Dask. In particular, we added apply_along_axis
, which behaves basically like NumPy's apply_along_axis
except for Dask Arrays instead. This results in somewhat simpler syntax. Also it avoids the need to rechunk your data before applying your custom function to each 1-D piece and makes no real requirements of your initial chunking, which it tries to preserve in the end result (excepting the axis that is either reduced or replaced).
In [1]: import dask.array as da
In [2]: import numpy as np
In [3]: def f(b):
...: return np.polyfit(b, np.arange(len(b)), 3)
...:
In [4]: x = da.random.random((5, 3, 3, 3), chunks=(5, 1, 1, 1))
In [5]: da.apply_along_axis(f, 0, x).compute()
Out[5]:
array([[[[ 2.13570599e+02, 2.28924503e+00, 6.16369231e+01],
[ 4.32000311e+00, 7.01462518e+01, -1.62215514e+02],
[ 2.89466687e+02, -1.35522215e+02, 2.86643721e+02]],
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With