So dask.dataframe.map_partitions()
takes a func
argument and the meta
kwarg. How exactly does it decide its return type? As an example:
Lots of csv's in ...\some_folder.
ddf = dd.read_csv(r"...\some_folder\*", usecols=['ColA', 'ColB'],
blocksize=None,
dtype={'ColA': np.float32, 'ColB': np.float32})
example_func = lambda x: x.iloc[-1] / len(x)
metaResult = pd.Series({'ColA': .1234, 'ColB': .1234})
result = ddf.map_partitions(example_func, meta=metaResult).compute()
I'm pretty new to "distributed" computing, but I would intuitively expect this to return a collection (a list or dict, most likely) of Series objects, yet the result is a Series object that could be considered a concatenation of the results of example_func on each partition. This in and of itself would also suffice, if this series had a MultiIndex to indicate the partition label.
From what I can tell from this question, the docs, and the source code itself, this is because ddf.divisions
will return a (None, None, ..., None)
as a result of reading csv's? Is there a dask-native way to do this, or do I need to manually go in and break the returned Series (a concatenation of the Series that were returned by example_func
on each partition) myself?
Also, feel free to correct my assumptions/practices here, as I'm new to dask.
map_partitions. Apply Python function on each DataFrame partition. Function applied to each partition.
Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)
Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.
So dask.dataframe.map_partitions() takes a func argument and the meta kwarg. How exactly does it decide its return type?
map_partition
tries to concatenate the results returned by func
to either a dask DataFrame or a dask Series object in an 'intelligent' way. This decision is based on the return value of func
:
func
returns a scalar, map_partitions
returns a dask Series object.func
returns a pd.Series object, map_partition
returns a dask Series object, in which all pd.Series objects returned by func
are concatenated.func
returns a pd.DataFrame, map_partitions returns a dask Dataframe object, in which these pd.DataFrame obejcts are concatenated along the first axis.If you are interested in the result of a special partition, you could use get_partition()
. If the partition label is in general an important information for you, I would consider to assign a separate column of your ddf directly after reading in the data from csv, which contains all the information you need. Afterwards, you could construct func
in a way, that it returns a pd.DataFrame containing the result of your calculation in one column and the information you need to identify the result in another.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With