I have monthly zarr files in s3 that have gridded temperature data. I would like to pull down multiple months of data for one lat/lon and create a dataframe of that time series. Some pseudo code:
datasets=[]
for file in files:
s3 = s3fs.S3FileSystem()
zarr_store = s3fs.S3Map(file, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
datasets.append(ds)
con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
so this code will work, but is incredibly slow. I was hoping to use dask to speed this up. My plan was to change the method to process one file at a time and return a dataframe. I would then call client.map() and generate all the dfs, then concat them together at the end. So I wound up with something similar to this:
def load(file, lat: float, long: float, start_date, end_date):
s3 = s3fs.S3FileSystem()
s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
zarr_store = s3fs.S3Map(s3_path, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
tmp = x.result().to_array().values
df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
df.set_index(['time', 'lat', 'long'], inplace=True)
return df
if __name__ == '__main__':
client = Client('tcp://xxx')
start_date = date(2000, 1, 7)
end_date = date(2000, 10, 20)
lat = 2
lon = 10
# get the s3 locations of the zarr files from the db
files = get_files()
# try just running with one file
res = client.submit(load, files[0], lat, lon, start_date, end_date)
# run them all
future = client.map(load, files,
repeat(lat), repeat(lon),
repeat(start_date), repeat(end_date))
x = client.gather(future)
This code runs fine when I connect client to just my local machine. But when I try to connect to a remote cluster I get the following error on the xr.open_zarr call:
KeyError: 'XXX/data.zarr/.zmetadata'
I tried changing up the code and loading the zarrs outside the method call and passing them in, but that only gave me nans as a result. Is there something I am missing? Is this not the correct way to solve what I'm trying to do?
If you want to just extract a time series at a point, you can just create a Dask client and then let xarray do the magic in parallel. In the example below we have just one zarr dataset, but as long as the workers stay busy processing the chunks in each Zarr file, you wouldn't gain anything from parsing the Zarr files in parallel.
import xarray as xr
import fsspec
import hvplot.xarray
from dask.distributed import Client
url = 's3://mur-sst/zarr' # Amazon Public Data
ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)
timeseries = ds['analysed_sst'].sel(time=slice('2015-01-01','2020-01-01'),
lat=43,
lon=-70).persist()
timeseries.hvplot()
produces:
Here is the Full Jupyter Notebook
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With