Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Memory errors using xarray + dask - use groupby or apply_ufunc?

I am using xarray as the basis of my workflow for analysing fluid turbulence data, but I'm having trouble leveraging dask correctly to limit memory usage on my laptop.

I have a dataarray n with dimensions ('t', 'x', 'z'), which I've split into chunks of 5 along the z dimension:

<xarray.DataArray 'n' (t: 801, x: 960, z: 512)>
dask.array<shape=(801, 960, 512), dtype=float32, chunksize=(801, 960, 5)>
Coordinates:
* t              (t) int64 200 201 202 203 204 205 206 207 208 209 210 211 ...
* x              (x) int64 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
* z              (z) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ...  

I want to calculate the root mean squared fluctuations of n over t, and return a reduced dataarray with dimensions ('x', 'z'). I want to utilise dask to only do this operation on one chunk at a time, as I only have a couple of GB of RAM available on my laptop at any time.

I've written a generalized ufunc to calculate the root mean square of a dask array:

def rms_gufunc(x, axis):
    """Generalized ufunc to calculate root mean squared value of a 1d dask array."""
    squares = np.square(x)
    return np.sqrt(squares.mean(axis=-1))

but now I'm not sure what the best way to apply this is. As far as I can see I can either use (1) xarray.apply_ufunc, or (2) groupby.reduce.

1. Use apply_ufunc:

I can apply this function using xarray.apply_ufunc:

def rms(da, dim=None):
    """
    Reduces a dataarray by calculating the root mean square along dimension dim.
    """

    if dim is None:
        raise ValueError('Must supply a dimension along which to calculate rms')

    return xr.apply_ufunc(rms_gufunc, da,
                          input_core_dims=[[dim]],
                          dask='parallelized', output_dtypes=[da.dtype])

n_rms = rms(data['n'], dim='t')
n_rms.load()  # Trigger computation

which seems to work, but it seems more complex than necessary?

2. Use groupby.reduce:

The xarray docs seems to suggest that this is a "split-apply-combine" operation which I should be able to accomplish through something like

n_rms = data['n'].groupby('z').reduce(rms_gufunc, dim='t')

However this causes a MemoryError, and I'm pretty sure that's not what I want to achieve with the groupby step anyway. Am I supposed to be using groupby_bins to bin the data into the chunks I made along z?

I would like to know a) if I'm using apply_ufunc correctly, and b) how I would do the same thing using groupby.

like image 409
ThomasNicholas Avatar asked Aug 02 '18 21:08

ThomasNicholas


1 Answers

Since it is a 3D array, I assume the following case.

The speed of water molecules over an x-z plane (960 μm x 512 μm) changes over time (801 fs). Find the rmsf for speed at each element of the x-z plane over all time.

The numpy code would be:

xz_plane_rmsf = (data ** 2).mean(axis=0)

where data is a 3D numpy array with shape=(801, 960, 512). The 0th, 1st, and 2nd dimensions of data represents time, x-coordinates, and z-coordinates. Each element of data represents average speed of water molecules at time t and at coordinates x and z.

The equivalent code for Dask array is:

# Make lazy array
xz_plane_rmsf = (data ** 2).mean(axis=0)
# Evaluate the array
xz_plane_rmsf = xz_plane_rmsf.compute()

where data is a 3D Dask array.

The only remaining problem is to convert a xarray to a Dask array. I don't use xarray, but it looks like it is already a Dask array:

<xarray.DataArray 'n' (t: 801, x: 960, z: 512)>
dask.array<shape=(801, 960, 512), dtype=float32, chunksize=(801, 960, 5)>
like image 137
R zu Avatar answered Nov 10 '22 05:11

R zu