Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to load and process zarr files using dask and xarray

I have monthly zarr files in s3 that have gridded temperature data. I would like to pull down multiple months of data for one lat/lon and create a dataframe of that time series. Some pseudo code:

datasets=[]
for file in files:
    s3 = s3fs.S3FileSystem()        
    zarr_store = s3fs.S3Map(file, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)
    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                       )
    datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()

so this code will work, but is incredibly slow. I was hoping to use dask to speed this up. My plan was to change the method to process one file at a time and return a dataframe. I would then call client.map() and generate all the dfs, then concat them together at the end. So I wound up with something similar to this:

def load(file, lat: float, long: float, start_date, end_date):

    s3 = s3fs.S3FileSystem()
    s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
    zarr_store = s3fs.S3Map(s3_path, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)

    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                 )

    tmp = x.result().to_array().values
    df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
    df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
    df.set_index(['time', 'lat', 'long'], inplace=True)

    return df

if __name__ == '__main__':
    client = Client('tcp://xxx')

    start_date = date(2000, 1, 7)
    end_date = date(2000, 10, 20)
    lat = 2
    lon = 10

    # get the s3 locations of the zarr files from the db
    files = get_files()

    # try just running with one file
    res = client.submit(load, files[0], lat, lon, start_date, end_date) 

    # run them all
    future = client.map(load, files,
                        repeat(lat), repeat(lon),
                        repeat(start_date), repeat(end_date))
    x = client.gather(future)

This code runs fine when I connect client to just my local machine. But when I try to connect to a remote cluster I get the following error on the xr.open_zarr call:

KeyError: 'XXX/data.zarr/.zmetadata'

I tried changing up the code and loading the zarrs outside the method call and passing them in, but that only gave me nans as a result. Is there something I am missing? Is this not the correct way to solve what I'm trying to do?

like image 739
David Avatar asked Oct 11 '25 07:10

David


1 Answers

If you want to just extract a time series at a point, you can just create a Dask client and then let xarray do the magic in parallel. In the example below we have just one zarr dataset, but as long as the workers stay busy processing the chunks in each Zarr file, you wouldn't gain anything from parsing the Zarr files in parallel.

import xarray as xr
import fsspec
import hvplot.xarray

from dask.distributed import Client

url = 's3://mur-sst/zarr'  # Amazon Public Data

ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)

timeseries = ds['analysed_sst'].sel(time=slice('2015-01-01','2020-01-01'),
                                    lat=43, 
                                    lon=-70).persist()

timeseries.hvplot()

produces:

enter image description here

Here is the Full Jupyter Notebook

like image 88
Rich Signell Avatar answered Oct 14 '25 05:10

Rich Signell



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!