I am using xarray as the basis of my workflow for analysing fluid turbulence data, but I'm having trouble leveraging dask correctly to limit memory usage on my laptop.
I have a dataarray n
with dimensions ('t', 'x', 'z')
, which I've split into chunks of 5 along the z dimension:
<xarray.DataArray 'n' (t: 801, x: 960, z: 512)>
dask.array<shape=(801, 960, 512), dtype=float32, chunksize=(801, 960, 5)>
Coordinates:
* t (t) int64 200 201 202 203 204 205 206 207 208 209 210 211 ...
* x (x) int64 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
* z (z) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ...
I want to calculate the root mean squared fluctuations of n
over t, and return a reduced dataarray with dimensions ('x', 'z')
. I want to utilise dask to only do this operation on one chunk at a time, as I only have a couple of GB of RAM available on my laptop at any time.
I've written a generalized ufunc to calculate the root mean square of a dask array:
def rms_gufunc(x, axis):
"""Generalized ufunc to calculate root mean squared value of a 1d dask array."""
squares = np.square(x)
return np.sqrt(squares.mean(axis=-1))
but now I'm not sure what the best way to apply this is. As far as I can see I can either use (1) xarray.apply_ufunc
, or (2) groupby.reduce
.
1. Use apply_ufunc
:
I can apply this function using xarray.apply_ufunc:
def rms(da, dim=None):
"""
Reduces a dataarray by calculating the root mean square along dimension dim.
"""
if dim is None:
raise ValueError('Must supply a dimension along which to calculate rms')
return xr.apply_ufunc(rms_gufunc, da,
input_core_dims=[[dim]],
dask='parallelized', output_dtypes=[da.dtype])
n_rms = rms(data['n'], dim='t')
n_rms.load() # Trigger computation
which seems to work, but it seems more complex than necessary?
2. Use groupby.reduce
:
The xarray docs seems to suggest that this is a "split-apply-combine" operation which I should be able to accomplish through something like
n_rms = data['n'].groupby('z').reduce(rms_gufunc, dim='t')
However this causes a MemoryError
, and I'm pretty sure that's not what I want to achieve with the groupby step anyway. Am I supposed to be using groupby_bins
to bin the data into the chunks I made along z
?
I would like to know a) if I'm using apply_ufunc
correctly, and b) how I would do the same thing using groupby
.
Since it is a 3D array, I assume the following case.
The speed of water molecules over an x-z plane (960 μm x 512 μm) changes over time (801 fs). Find the rmsf for speed at each element of the x-z plane over all time.
The numpy code would be:
xz_plane_rmsf = (data ** 2).mean(axis=0)
where data is a 3D numpy array with shape=(801, 960, 512)
.
The 0th, 1st, and 2nd dimensions of data
represents time, x-coordinates, and z-coordinates. Each element of data
represents average speed of water molecules at time t and at coordinates x and z.
The equivalent code for Dask array is:
# Make lazy array
xz_plane_rmsf = (data ** 2).mean(axis=0)
# Evaluate the array
xz_plane_rmsf = xz_plane_rmsf.compute()
where data
is a 3D Dask array.
The only remaining problem is to convert a xarray to a Dask array. I don't use xarray, but it looks like it is already a Dask array:
<xarray.DataArray 'n' (t: 801, x: 960, z: 512)>
dask.array<shape=(801, 960, 512), dtype=float32, chunksize=(801, 960, 5)>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With