Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Groupby and shift a dask dataframe

Tags:

python

dask

I would like to scale some operations I do on pandas dataframe using dask 2.14. For example I would like to apply a shift on a column of a dataframe:

import dask.dataframe as dd
data = dd.read_csv('some_file.csv')
data.set_index('column_A')
data['column_B'] = data.groupby(['column_A'])['column_B'].shift(-1)

but I get AttributeError: 'SeriesGroupBy' object has no attribute 'shift' I read the dask documentation and I saw that there is not such a method (while there was in pandas)

Can you suggest some valid alternative?

Thank you

like image 459
Luca Monno Avatar asked May 05 '20 11:05

Luca Monno


People also ask

Is DASK faster than pandas?

Let's start with the simplest operation — read a single CSV file. To my surprise, we can already see a huge difference in the most basic operation. Datatable is 70% faster than pandas while dask is 500% faster! The outcomes are all sorts of DataFrame objects which have very identical interfaces.

How do I partition a DASK DataFrame?

This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.

Is DASK apply parallel?

Dask helps developers scale their entire Python ecosystem, and it can work with your laptop or a container cluster. Dask is a one-stop solution for larger-than-memory data sets, as it provides multicore and distributed parallel execution.

How many partitions should I have DASK?

You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.


1 Answers

There is an open ticket about this on GitHub. Essentially, you will have to use apply to get around it. I'm not sure whether this carries performance implications in dask. There is a further ticket referencing the issue and stating that it lies in pandas, but it's been open for some time.

This should be equivalent to the pandas operation:

import dask.dataframe as dd
import pandas as pd
import random

df = pd.DataFrame({'a': list(range(10)),
                   'b': random.choices(['x', 'y'], k=10)})

print("####### PANDAS ######")
print("Initial df")
print(df.head(10))
print("................")

pandas_df = df.copy()
print("Final df")

pandas_df['a'] = pandas_df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))

print(pandas_df.head(10))
print()


print("####### DASK ######")
print("Initial df")
dask_df = dd.from_pandas(df, npartitions=1).reset_index()
print(dask_df.head(10))
print("................")

dask_df['a'] = dask_df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))

print("Final df")
print(dask_df.head(10))

I obviously can't benchmark the approach in dask since there seems to be no alternative. However, I can in pandas:

import string

import numpy as np
import pandas as pd


df = pd.DataFrame({'a': list(range(100000)),
                   'b': np.random.choice(list(string.ascii_lowercase), 100000)
                   })

def normal_way(df):
    df = df.groupby(['b'])['a'].shift(-1)

def apply_way(df):
    df = df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))

The timeit results are:

%timeit normal_way(df)
4.25 ms ± 98 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit apply_way(df)
15 ms ± 446 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
like image 129
roganjosh Avatar answered Sep 18 '22 15:09

roganjosh