Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

filtering with dask read_parquet method gives unwanted results

I am trying to read parquet files using thedask read_parquet method and the filters kwarg. however it sometimes doesn't filter according to the given condition.

Example: creating and saving data frame with a dates column

import pandas as pd
import numpy as np
import dask.dataframe as dd

nums  = range(1,6)
dates = pd.date_range('2018-07-01', periods=5, freq='1d')
df = pd.DataFrame({'dates':dates, 'nums': nums})

ddf = dd.from_pandas(df, npartitions=3).to_parquet('test_par', engine = 'fastparquet')

when i read and filter on the dates column from the 'test_par' folder it doesn't seem to work

filters=[('dates', '>', np.datetime64('2018-07-04'))]
df  = dd.read_parquet('test_par', engine='fastparquet', filters=filters).compute()

as you can see in the output, 2018-07-03 and 2018-07-04 are present.

+-------+------------+------+
|       | dates      | nums |
+-------+------------+------+
| index |            |      |
+-------+------------+------+
| 2     | 2018-07-03 | 3    |
+-------+------------+------+
| 3     | 2018-07-04 | 4    |
+-------+------------+------+
| 4     | 2018-07-05 | 5    |
+-------+------------+------+

Am i doing something wrong ? or should i report this on github ?

like image 436
moshevi Avatar asked Jul 09 '18 11:07

moshevi


People also ask

How many partitions can a DASK DataFrame have?

dataframe has only one partition then only one core can operate at a time. If you have too many partitions then the scheduler may incur a lot of overhead deciding where to compute each task.

Can DASK read Parquet files?

Dask Dataframes can read and store data in many of the same formats as Pandas dataframes. In this example we read and write data with the popular CSV and Parquet formats, and discuss best practices when using these formats.


1 Answers

The filters keyword is a row-group-wise action (row-group is the parquet term for a set of data rows, like partition for a data-frame). It does not do any filtering within partitions.

When you use filters, you will be excluding partitions in which, according to the max/min statistics in the file, there are no rows in a given partition which can match the given filter. For example, if you specify x>5, a partition that has min=2,max=4 will be excluded, but one with min=2,max=6 will not, even though the latter will contain only some rows that meet the filter.

To filter the data, you should still use usual syntax

df[df.dates > np.datetime64('2018-07-04')]

in addition to filter, and view the use of filters as an optional optimisation. Without it, Dask would have to read even partitions with no good data, and then apply the condition, resulting in no results for those partitions. Better not to load them, if possible.

like image 95
mdurant Avatar answered Sep 25 '22 07:09

mdurant