I have used rosetta.parallel.pandas_easy
to parallelize apply
after groupby
, for example:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
However, has anyone figured out how to parallelize a function that returns a DataFrame? This code fails for rosetta
, as expected.
def tmpFunc(df): df['c'] = df.a + df.b return df df.groupby(df.index).apply(tmpFunc) groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
TLDR; Dask DataFrame can parallelize pandas apply() and map() operations, but it can do much more. With Dask's map_partitions(), you can work on each partition of your Dask DataFrame, which is a pandas DataFrame, while leveraging parallelism for various custom workflows.
Groupby preserves the order of rows within each group. When calling apply, add group keys to index to identify pieces. Reduce the dimensionality of the return type if possible, otherwise return a consistent type.
The function passed to apply must take a dataframe as its first argument and return a dataframe, a series or a scalar. apply will then take care of combining the results back together into a single dataframe or series. apply is therefore a highly flexible grouping method.
Groupby is a very popular function in Pandas. This is very good at summarising, transforming, filtering, and a few other very essential data analysis tasks.
This seems to work, although it really should be built in to pandas
import pandas as pd from joblib import Parallel, delayed import multiprocessing def tmpFunc(df): df['c'] = df.a + df.b return df def applyParallel(dfGrouped, func): retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped) return pd.concat(retLst) if __name__ == '__main__': df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) print 'parallel version: ' print applyParallel(df.groupby(df.index), tmpFunc) print 'regular version: ' print df.groupby(df.index).apply(tmpFunc) print 'ideal version (does not work): ' print df.groupby(df.index).applyParallel(tmpFunc)
Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:
from multiprocessing import Pool, cpu_count def applyParallel(dfGrouped, func): with Pool(cpu_count()) as p: ret_list = p.map(func, [group for name, group in dfGrouped]) return pandas.concat(ret_list)
By the way: this can not replace any groupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1
to the final pandas.concat()
call.
EDIT: the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With