When I run essentially the same calculations with dask against zarr data and parquet data, the zarr-based calculations are significantly faster. Why? Is it maybe because I did something wrong when I created the parquet files?
I've replicated the issue with fake data (see below) in a jupyter notebook to illustrate the kind of behavior I'm seeing. I'd appreciate any insight anyone has into why the zarr-based calculation is orders of magnitude faster than the parquet-based calculation.
The data I'm working with in real life is earth science model data. The particular data parameters are not important, but each parameter can be thought of as an array with latitude, longitude, and time dimensions.
To generate zarr files, I simply write out the multi-dimensional structure of my parameter and its dimensions.
To generate parquet, I first "flatten" the 3-D parameter array into a 1-D array, which becomes a single column in my data frame. I then add latitude, longitude, and time columns before writing the data frame out as parquet.
This cell has all the imports needed for the rest of the code:
import pandas as pd
import numpy as np
import xarray as xr
import dask
import dask.array as da
import intake
from textwrap import dedent
This cell generates the fake data files, which total a bit more than 3 Gigabytes in size:
def build_data(lat_resolution, lon_resolution, ntimes):
"""Build a fake geographical dataset with ntimes time steps and
resolution lat_resolution x lon_resolution"""
lats = np.linspace(-90.0+lat_resolution/2,
90.0-lat_resolution/2,
np.round(180/lat_resolution))
lons = np.linspace(-180.0+lon_resolution/2,
180-lon_resolution/2,
np.round(360/lon_resolution))
times = np.arange(start=1,stop=ntimes+1)
data = np.random.randn(len(lats),len(lons),len(times))
return lats,lons,times,data
def create_zarr_from_data_set(lats,lons,times,data,zarr_dir):
"""Write zarr from a data set corresponding to the data passed in."""
dar = xr.DataArray(data,
dims=('lat','lon','time'),
coords={'lat':lats,'lon':lons,'time':times},
name="data")
ds = xr.Dataset({'data':dar,
'lat':('lat',lats),
'lon':('lon',lons),
'time':('time',times)})
ds.to_zarr(zarr_dir)
def create_parquet_from_data_frame(lats,lons,times,data,parquet_file):
"""Write a parquet file from a dataframe corresponding to the data passed in."""
total_points = len(lats)*len(lons)*len(times)
# Flatten the data array
data_flat = np.reshape(data,(total_points,1))
# use meshgrid to create the corresponding latitude, longitude, and time
# columns
mesh = np.meshgrid(lats,lons,times,indexing='ij')
lats_flat = np.reshape(mesh[0],(total_points,1))
lons_flat = np.reshape(mesh[1],(total_points,1))
times_flat = np.reshape(mesh[2],(total_points,1))
df = pd.DataFrame(data = np.concatenate((lats_flat,
lons_flat,
times_flat,
data_flat),axis=1),
columns = ["lat","lon","time","data"])
df.to_parquet(parquet_file,engine="fastparquet")
def create_fake_data_files():
"""Create zarr and parquet files with fake data"""
zarr_dir = "zarr"
parquet_file = "data.parquet"
lats,lons,times,data = build_data(0.1,0.1,31)
create_zarr_from_data_set(lats,lons,times,data,zarr_dir)
create_parquet_from_data_frame(lats,lons,times,data,parquet_file)
with open("data_catalog.yaml",'w') as f:
catalog_str = dedent("""\
sources:
zarr:
args:
urlpath: "./{}"
description: "data in zarr format"
driver: intake_xarray.xzarr.ZarrSource
metadata: {{}}
parquet:
args:
urlpath: "./{}"
description: "data in parquet format"
driver: parquet
""".format(zarr_dir,parquet_file))
f.write(catalog_str)
##
# Generate the fake data
##
create_fake_data_files()
I ran several different kinds of calculations against the parquet and zarr files, but for simplicity in this example, I'll just pull a single parameter value out at a particular time, latitude, and longitude.
This cell builds the zarr and parquet directed acyclic graphs (DAGs) for the calculation:
# pick some arbitrary point to pull out of the data
lat_value = -0.05
lon_value = 10.95
time_value = 5
# open the data
cat = intake.open_catalog("data_catalog.yaml")
data_zarr = cat.zarr.to_dask()
data_df = cat.parquet.to_dask()
# build the DAG for getting a single point out of the zarr data
time_subset = data_zarr.where(data_zarr.time==time_value,drop=True)
lat_condition = da.logical_and(time_subset.lat < lat_value + 1e-9, time_subset.lat > lat_value - 1e-9)
lon_condition = da.logical_and(time_subset.lon < lon_value + 1e-9, time_subset.lon > lon_value - 1e-9)
geo_condition = da.logical_and(lat_condition,lon_condition)
zarr_subset = time_subset.where(geo_condition,drop=True)
# build the DAG for getting a single point out of the parquet data
parquet_subset = data_df[(data_df.lat > lat_value - 1e-9) &
(data_df.lat < lat_value + 1e-9) &
(data_df.lon > lon_value - 1e-9) &
(data_df.lon < lon_value + 1e-9) &
(data_df.time == time_value)]
When I run time against the compute for each of the DAGs, I get wildly different times. The zarr-based subset takes less than a second. The parquet-based subset takes 15-30 seconds.
This cell does the zarr-based calculation:
%%time
zarr_point = zarr_subset.compute()
Zarr-based calculation time:
CPU times: user 6.19 ms, sys: 5.49 ms, total: 11.7 ms
Wall time: 12.8 ms
This cell does the parquet-based calculation:
%%time
parquet_point = parquet_subset.compute()
Parquet-based calculation time:
CPU times: user 18.2 s, sys: 28.1 s, total: 46.2 s
Wall time: 29.3 s
As you can see, the zarr-based calculation is much, much faster. Why?
Glad to see fastparquet
, zarr
and intake
used in the same question!
TL;DR here is: use the right data model appropriate for your task.
Also, it's worth pointing out that the zarr dataset is 1.5GB, blosc/lz4 compressed in 512 chunks, and the parquet dataset 1.8GB, snappy compressed in 5 chunks, where the compression are both the defaults. The random data does not compress well, the coordinates do.
zarr is an array-oriented format, and can be chunked on any dimension, which means that, to read a single point, you only need the metadata (which is very brief text) and the one chunk which contains it - which needs to be uncompressed in this case. The indexing of the data chunks is implicit.
parquet is a column-oriented format. To find a specific point, you may be able to ignore some chunks based in the min/max column metadata for each chunk, depending on how the coordinate columns are organised, and then load the column chunk for the random data and decompress. You would need custom logic to be able to select chunks for loading on multiple columns simultaneously, which Dask does not currently implement (and would not be possible without carefully reordering your data). The metadata for parquet is much larger than for zarr, but both insignificant in this case - if you had many variables or more coordinates, this might become an extra issue for parquet.
In this case random access will be much faster for zarr, but reading all of the data is not radically different, since both must load all the bytes on disc and uncompress into floats, and in both cases the coordinates data loads quickly. However, the in-memory representation of the uncompressed dataframe is much larger than for the uncompressed array, since instead of a 1D small array for each coordinate, you now have arrays for each coordinate with the same number of points as the random data; plus, again, to find a particular point is done by indexing the small arrays to get the right coordinate in the array case, and by comparing to every single lat/lon value of every single point for the dataframe case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With