I would like to scale some operations I do on pandas dataframe using dask 2.14. For example I would like to apply a shift on a column of a dataframe:
import dask.dataframe as dd
data = dd.read_csv('some_file.csv')
data.set_index('column_A')
data['column_B'] = data.groupby(['column_A'])['column_B'].shift(-1)
but I get AttributeError: 'SeriesGroupBy' object has no attribute 'shift'
I read the dask documentation and I saw that there is not such a method (while there was in pandas)
Can you suggest some valid alternative?
Thank you
Let's start with the simplest operation — read a single CSV file. To my surprise, we can already see a huge difference in the most basic operation. Datatable is 70% faster than pandas while dask is 500% faster! The outcomes are all sorts of DataFrame objects which have very identical interfaces.
This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.
Dask helps developers scale their entire Python ecosystem, and it can work with your laptop or a container cluster. Dask is a one-stop solution for larger-than-memory data sets, as it provides multicore and distributed parallel execution.
You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.
There is an open ticket about this on GitHub. Essentially, you will have to use apply
to get around it. I'm not sure whether this carries performance implications in dask
. There is a further ticket referencing the issue and stating that it lies in pandas
, but it's been open for some time.
This should be equivalent to the pandas
operation:
import dask.dataframe as dd
import pandas as pd
import random
df = pd.DataFrame({'a': list(range(10)),
'b': random.choices(['x', 'y'], k=10)})
print("####### PANDAS ######")
print("Initial df")
print(df.head(10))
print("................")
pandas_df = df.copy()
print("Final df")
pandas_df['a'] = pandas_df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))
print(pandas_df.head(10))
print()
print("####### DASK ######")
print("Initial df")
dask_df = dd.from_pandas(df, npartitions=1).reset_index()
print(dask_df.head(10))
print("................")
dask_df['a'] = dask_df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))
print("Final df")
print(dask_df.head(10))
I obviously can't benchmark the approach in dask
since there seems to be no alternative. However, I can in pandas
:
import string
import numpy as np
import pandas as pd
df = pd.DataFrame({'a': list(range(100000)),
'b': np.random.choice(list(string.ascii_lowercase), 100000)
})
def normal_way(df):
df = df.groupby(['b'])['a'].shift(-1)
def apply_way(df):
df = df.groupby(['b'])['a'].apply(lambda x: x.shift(-1))
The timeit
results are:
%timeit normal_way(df)
4.25 ms ± 98 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit apply_way(df)
15 ms ± 446 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With