Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

On Dask DataFrame.apply(), receiving n rows of value 1 before actual rows processed

In the below code snippet, I would expect the logs to print the numbers 0 - 4. I understand that the numbers may not be in that order, as the task would be broken up into a number of parallel operations.

Code snippet:

from dask import dataframe as dd
import numpy as np
import pandas as pd

df = pd.DataFrame({'A': np.arange(5),
                   'B': np.arange(5),
                   'C': np.arange(5)})

ddf = dd.from_pandas(df, npartitions=1)

def aggregate(x):
    print('B val received: ' + str(x.B))
    return x

ddf.apply(aggregate, axis=1).compute()

But when the above code is run, I see this instead:

B val received: 1
B val received: 1
B val received: 1
B val received: 0
B val received: 0
B val received: 1
B val received: 2
B val received: 3
B val received: 4

Instead of 0 - 4, I see a series of 1 printed first, and an extra 0. I have noticed the "extra" rows of value 1 occurring every time I have set up a Dask DataFrame and run an apply operation on it.

Printing the dataframe shows no additional rows with value 1 throughout:

   A  B  C
0  0  0  0
1  1  1  1
2  2  2  2
3  3  3  3
4  4  4  4

My question is: Where are these rows with value 1 coming from? Why do they appear to consistently occur prior to the "actual" rows in the dataframe? The 1 values seem unrelated to the values in the actual rows (that is, it is not as though it is for some reason grabbing the second row an extra few times).

like image 417
kuanb Avatar asked Apr 14 '17 18:04

kuanb


People also ask

How do I select a row in a Dask DataFrame?

Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)

Is Dask apply parallel?

Use Cases of Dask Parallelizing data science apps: to achieve parallelism in any data science and ML solution, Dask is the preferred choice because parallelism is not limited to a single application. You can also parallelize multiple applications on the same hardware/cluster.

What is Npartitions in Dask DataFrame?

The npartitions property is the number of Pandas dataframes that compose a single Dask dataframe. This affects performance in two main ways. If you don't have enough partitions then you may not be able to use all of your cores effectively. For example if your dask.

Is Dask merge faster than pandas?

Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .


2 Answers

@Grr 's answer is correct. Dask.dataframe doesn't know what your function will produce, but still has to provide a lazy dask.dataframe for you with the correct types, dtypes, etc., so it tries your function on a little bit of data.

You can avoid these checks by providing metadata about your intended output using the meta= keyword (more details in the DataFrame.apply docstring). If you provide this information then Dask.dataframe will not need to try your function to determine types.

Copying this section here:

Docstring

meta : pd.DataFrame, pd.Series, dict, iterable, tuple, optional

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided. Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

Solution

So if you create an example output as an empty dataframe then you'll be fine:

meta = pd.DataFrame({'A': [1], 'B': [2], 'C': [3]}, 
                    columns=['A', 'B', 'C'])
ddf.apply(aggregate, axis=1, meta=meta)

Or, in this case because your function doesn't change the columns or dtype of the input, you can just use the input's meta

ddf.apply(aggregate, axis=1, meta=ddf.meta)
like image 108
MRocklin Avatar answered Nov 01 '22 21:11

MRocklin


Dask does some checking on what you have told it to do before it tries to do it on the entire collection of partitions. That is where the first few print statements are coming from. It's part of the built in error checking that prevents Dask from going down some long winded series of operations and failing at the end.

like image 21
Grr Avatar answered Nov 01 '22 21:11

Grr