Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Dask - dataframe.map_partitions() return value

So dask.dataframe.map_partitions() takes a func argument and the meta kwarg. How exactly does it decide its return type? As an example:

Lots of csv's in ...\some_folder.

ddf = dd.read_csv(r"...\some_folder\*", usecols=['ColA', 'ColB'], 
                                        blocksize=None, 
                                        dtype={'ColA': np.float32, 'ColB': np.float32})
example_func = lambda x: x.iloc[-1] / len(x)
metaResult = pd.Series({'ColA': .1234, 'ColB': .1234})
result = ddf.map_partitions(example_func, meta=metaResult).compute()

I'm pretty new to "distributed" computing, but I would intuitively expect this to return a collection (a list or dict, most likely) of Series objects, yet the result is a Series object that could be considered a concatenation of the results of example_func on each partition. This in and of itself would also suffice, if this series had a MultiIndex to indicate the partition label.

From what I can tell from this question, the docs, and the source code itself, this is because ddf.divisions will return a (None, None, ..., None) as a result of reading csv's? Is there a dask-native way to do this, or do I need to manually go in and break the returned Series (a concatenation of the Series that were returned by example_func on each partition) myself?

Also, feel free to correct my assumptions/practices here, as I'm new to dask.

like image 877
StarFox Avatar asked Nov 17 '16 18:11

StarFox


People also ask

What does Map_partitions do in the context of a Dask DataFrame?

map_partitions. Apply Python function on each DataFrame partition. Function applied to each partition.

How do I select a row in a Dask DataFrame?

Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)

Is Dask faster than pandas?

Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.


1 Answers

So dask.dataframe.map_partitions() takes a func argument and the meta kwarg. How exactly does it decide its return type?

map_partition tries to concatenate the results returned by func to either a dask DataFrame or a dask Series object in an 'intelligent' way. This decision is based on the return value of func:

  • If func returns a scalar, map_partitions returns a dask Series object.
  • If func returns a pd.Series object, map_partition returns a dask Series object, in which all pd.Series objects returned by func are concatenated.
  • If func returns a pd.DataFrame, map_partitions returns a dask Dataframe object, in which these pd.DataFrame obejcts are concatenated along the first axis.

If you are interested in the result of a special partition, you could use get_partition(). If the partition label is in general an important information for you, I would consider to assign a separate column of your ddf directly after reading in the data from csv, which contains all the information you need. Afterwards, you could construct func in a way, that it returns a pd.DataFrame containing the result of your calculation in one column and the information you need to identify the result in another.

like image 132
Arco Bast Avatar answered Oct 13 '22 22:10

Arco Bast