I am trying to read parquet files using thedask
read_parquet
method and the filters
kwarg. however it sometimes doesn't filter according to the given condition.
Example:
creating and saving data frame with a dates
column
import pandas as pd
import numpy as np
import dask.dataframe as dd
nums = range(1,6)
dates = pd.date_range('2018-07-01', periods=5, freq='1d')
df = pd.DataFrame({'dates':dates, 'nums': nums})
ddf = dd.from_pandas(df, npartitions=3).to_parquet('test_par', engine = 'fastparquet')
when i read and filter on the dates
column from the 'test_par'
folder it doesn't seem to work
filters=[('dates', '>', np.datetime64('2018-07-04'))]
df = dd.read_parquet('test_par', engine='fastparquet', filters=filters).compute()
as you can see in the output, 2018-07-03
and 2018-07-04
are present.
+-------+------------+------+
| | dates | nums |
+-------+------------+------+
| index | | |
+-------+------------+------+
| 2 | 2018-07-03 | 3 |
+-------+------------+------+
| 3 | 2018-07-04 | 4 |
+-------+------------+------+
| 4 | 2018-07-05 | 5 |
+-------+------------+------+
Am i doing something wrong ? or should i report this on github ?
dataframe has only one partition then only one core can operate at a time. If you have too many partitions then the scheduler may incur a lot of overhead deciding where to compute each task.
Dask Dataframes can read and store data in many of the same formats as Pandas dataframes. In this example we read and write data with the popular CSV and Parquet formats, and discuss best practices when using these formats.
The filters
keyword is a row-group-wise action (row-group is the parquet term for a set of data rows, like partition for a data-frame). It does not do any filtering within partitions.
When you use filters
, you will be excluding partitions in which, according to the max/min statistics in the file, there are no rows in a given partition which can match the given filter. For example, if you specify x>5, a partition that has min=2,max=4 will be excluded, but one with min=2,max=6 will not, even though the latter will contain only some rows that meet the filter.
To filter the data, you should still use usual syntax
df[df.dates > np.datetime64('2018-07-04')]
in addition to filter, and view the use of filters as an optional optimisation. Without it, Dask would have to read even partitions with no good data, and then apply the condition, resulting in no results for those partitions. Better not to load them, if possible.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With