According to this answer, Dask dataframes can perform smart indexing if Dask knows that the dataframe's index is sorted.
How do I let Dask know if an index is sorted?
In my specific situation I am doing something like this:
for source is sources:
# This df has a datetimeindex that I know to be sorted
pd = load_pandas_df_from_some_source(source)
dd = dask.dataframe.from_pandas(pd, chunksize=foo)
dd.to_hdf(some_unique_filename, '/data')
However when I do something like this, the indexing is incredibly slow:
dd = dask.dataframe.read_hdf(some_glob, '/data')
print(dd.loc['2001-1-1':'2001-1-2'])
I assume that Dask doesn't know that my dataframe is sorted. How do I let it know?
When you load from HDF, the data values of the index in each partition are not necessarily known. These are used to construct the divisions attribute of the daataframe which is used for accelerating lookups.
For a dataset like yours, you should be able to pass sorted_index=True, and get the behaviour you want.
As @kuanb suggested, you may want to try storing in parquet format, which is designed specifically for tabular data. Whether it provides more performance will depend on the nature of your data (hdf was written primarily for numeric data) and the use case, ymmv; however parquet does in general do a good job of keeping metadata statistics of the data values in each partition.
As @mdurant suggests, using the sorted_index= keyword to the read_hdf function is ideal.
More generally you can use the set_index function to set indexes on any dataframe, even those created with other methods. This function has new keywords that allow you to be efficient if your new index column is already sorted and if you know the separating values between partitions already. Here is the current docstring. The last example may be of interest to you:
"""Set the DataFrame index (row labels) using an existing column
This realigns the dataset to be sorted by a new column. This can have a
significant impact on performance, because joins, groupbys, lookups, etc.
are all much faster on that column. However, this performance increase
comes with a cost, sorting a parallel dataset requires expensive shuffles.
Often we ``set_index`` once directly after data ingest and filtering and
then perform many cheap computations off of the sorted dataset.
This function operates exactly like ``pandas.set_index`` except with
different performance costs (it is much more expensive). Under normal
operation this function does an initial pass over the index column to
compute approximate qunatiles to serve as future divisions. It then passes
over the data a second time, splitting up each input partition into several
pieces and sharing those pieces to all of the output partitions now in
sorted order.
In some cases we can alleviate those costs, for example if your dataset is
sorted already then we can avoid making many small pieces or if you know
good values to split the new index column then we can avoid the initial
pass over the data. For example if your new index is a datetime index and
your data is already sorted by day then this entire operation can be done
for free. You can control these options with the following parameters.
Parameters
----------
df: Dask DataFrame
index: string or Dask Series
npartitions: int, None, or 'auto'
The ideal number of output partitions. If None use the same as
the input. If 'auto' then decide by memory use.
shuffle: string, optional
Either ``'disk'`` for single-node operation or ``'tasks'`` for
distributed operation. Will be inferred by your current scheduler.
sorted: bool, optional
If the index column is already sorted in increasing order.
Defaults to False
divisions: list, optional
Known values on which to separate index values of the partitions.
See http://dask.pydata.org/en/latest/dataframe-design.html#partitions
Defaults to computing this with a single pass over the data. Note
that if ``sorted=True``, specified divisions are assumed to match
the existing partitions in the data. If this is untrue, you should
leave divisions empty and call ``repartition`` after ``set_index``.
compute: bool
Whether or not to trigger an immediate computation. Defaults to False.
Examples
--------
>>> df2 = df.set_index('x') # doctest: +SKIP
>>> df2 = df.set_index(d.x) # doctest: +SKIP
>>> df2 = df.set_index(d.timestamp, sorted=True) # doctest: +SKIP
A common case is when we have a datetime column that we know to be
sorted and is cleanly divided by day. We can set this index for free
by specifying both that the column is pre-sorted and the particular
divisions along which is is separated
>>> import pandas as pd
>>> divisions = pd.date_range('2000', '2010', freq='1D')
>>> df2 = df.set_index('timestamp', sorted=True, divisions=divisions) # doctest: +SKIP
"""
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With