Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create and write xarray DataArray to NetCDF in chunks

Is it also possible to create an out-of-core DataArray, and write it chunk-by-chunk to a NetCDF4 file using xarray?

For example, I want to be able to do this in an out-of-core fashion when the dimensions are much bigger and I thus cannot store the whole array in memory:

num_steps = 20
num_times = 100
#Create DataArray
d = xr.DataArray(np.zeros([num_steps, num_times], np.float32),
                 {'Step': np.arange(num_steps),
                  'Time': np.arange(num_times)},
                 ('Step', 'Time'))
#Computatation
for i in range(num_steps):
    d[i, :] = i
#Write to file
d.to_netcdf('test.nc')

So I don't want to have to create the whole NumPy array in memory, and I want the Computation and Write to file stages to be done one chunk at a time (chunked over the Step dimension in this example).

Update: It seems (from @jhamman's answer) that it may not be possible to implement my example above using xarray. I am mainly interested in developing a greater understanding of out-of-core computation with xarray, so I do not have a specific computation that I am asking about, but, since I have been asked for a more complicated example, one potential application I have is:

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[i, :] = u[:] * s[:]

where u and s are xr.DataArrays of dimension Time, and f and g are PDE solvers that only depend on the input array from the previous step. Let's say there are 1000 steps, but the Time dimension is so big that I can only store one or two in memory, so assignments to d must be written to disk and then the associated memory freed.

like image 667
user3708067 Avatar asked Oct 26 '17 10:10

user3708067


2 Answers

Yes, xarray supports out-of-core arrays and writing in chunks. You will need to write your computation using xarray operations and Dask arrays instead of NumPy arrays. The xarray docs should be helpful here.

Update: For a simulation like this, you would need to compute each function f using dask.delayed. Then you could convert the results in dask arrays with dask.array.from_delayed, wrap them back in xarray.DataArray and write the data directly to disk with to_netcdf(). The result proceeds in a streaming fashion, with f() and g() computed in parallel and no more than a few time-steps ever loaded into memory:

import dask
import dask.array as da
import numpy as np
import xarray

def f(x):
    return 1.1 * x

def g(x):
    return 0.9 * x

num_steps = 1000
num_times = int(1e6)

u = np.ones(num_times)
s = np.ones(num_times)

arrays = []
for i in range(num_steps):
    u = dask.delayed(f)(u)
    s = dask.delayed(g)(s)
    product = da.from_delayed(u * s, shape=(num_times,), dtype=float)
    arrays.append(product)

stacked = da.stack(arrays)
data_array = xarray.DataArray(stacked, dims=['step', 'time'])
%time data_array.to_netcdf('results.nc')
# CPU times: user 7.44 s, sys: 13.5 s, total: 20.9 s
# Wall time: 29.4 s

You'll notice that xarray is pretty peripheral to this computation: most of the computation was done with dask/numpy. You could easily do this with xarray objects, too, but we don't have a convenient way to pass labeled array metadata through dask delayed objects, so either way you would need to reconstruct metadata on the other side.

You could argue that using dask here is overkill, and you would probably be right. Even if you want to use dask for parallelization, you still probably want to check-point the simulation after each step in the form of a valid netCDF file.

So a simple loop that extends a netCDF file at each iteration is probably want you want. This is not yet supported by xarray but this would be a nice feature to have. Something like the following interface should be possible:

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[:] = u[:] * s[:]
    d.to_netcdf('results.nc', extend='step')

In the meantime, you could write separate files for each step, e.g.,

for i in range(num_steps):
    u[:] = f(u)
    s[:] = g(s)
    d[:] = u[:] * s[:]
    d.to_netcdf('results-%04d.nc' % i)

Then you could load all your data together and consolidate it into a single file afterwards using open_mfdataset, e.g.,

combined = xarray.open_mfdataset('results-*.nc', autoclose=True)
combined.to_netcdf('results-combined.nc')
like image 139
shoyer Avatar answered Oct 02 '22 22:10

shoyer


Dask arrays do not currently support item assignment, see Item assignment to Python dask array objects.

So this will not work if d is a xarray.DataArray with a dask.array under the hood.

Additionally, none of the current Xarray backends support chunked writes. EDIT: As @shoyer points out, it is possible to have xarray write chunked arrays incrementally. However, for your use case here, since you seem to need item assignment, it may be necessary to use the netCDF4-python library directly:

from netCDF4 import Dataset

f = Dataset('test.nc', mode='w')
f.createDimension("Step", nsteps)
f.createDimension("time", ntimes)
d = f.createVariable("d", "f4",("Step", "time"))

#Computatation
for i in range(num_steps):
    d[i, :] = i

I assume your computation is more complicated than your example so you may think of replacing = i with something that uses xarray/dask.

like image 34
jhamman Avatar answered Oct 02 '22 20:10

jhamman