Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize apply after pandas groupby

I have used rosetta.parallel.pandas_easy to parallelize apply after groupby, for example:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2']) groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index) 

However, has anyone figured out how to parallelize a function that returns a DataFrame? This code fails for rosetta, as expected.

def tmpFunc(df):     df['c'] = df.a + df.b     return df  df.groupby(df.index).apply(tmpFunc) groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index) 
like image 391
Ivan Avatar asked Oct 03 '14 22:10

Ivan


People also ask

Can pandas be parallelize?

TLDR; Dask DataFrame can parallelize pandas apply() and map() operations, but it can do much more. With Dask's map_partitions(), you can work on each partition of your Dask DataFrame, which is a pandas DataFrame, while leveraging parallelism for various custom workflows.

Does Groupby preserve order python?

Groupby preserves the order of rows within each group. When calling apply, add group keys to index to identify pieces. Reduce the dimensionality of the return type if possible, otherwise return a consistent type.

How does pandas Groupby apply work?

The function passed to apply must take a dataframe as its first argument and return a dataframe, a series or a scalar. apply will then take care of combining the results back together into a single dataframe or series. apply is therefore a highly flexible grouping method.

Is pandas Groupby efficient?

Groupby is a very popular function in Pandas. This is very good at summarising, transforming, filtering, and a few other very essential data analysis tasks.


2 Answers

This seems to work, although it really should be built in to pandas

import pandas as pd from joblib import Parallel, delayed import multiprocessing  def tmpFunc(df):     df['c'] = df.a + df.b     return df  def applyParallel(dfGrouped, func):     retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)     return pd.concat(retLst)  if __name__ == '__main__':     df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])     print 'parallel version: '     print applyParallel(df.groupby(df.index), tmpFunc)      print 'regular version: '     print df.groupby(df.index).apply(tmpFunc)      print 'ideal version (does not work): '     print df.groupby(df.index).applyParallel(tmpFunc) 
like image 166
Ivan Avatar answered Sep 17 '22 23:09

Ivan


Ivan's answer is great, but it looks like it can be slightly simplified, also removing the need to depend on joblib:

from multiprocessing import Pool, cpu_count  def applyParallel(dfGrouped, func):     with Pool(cpu_count()) as p:         ret_list = p.map(func, [group for name, group in dfGrouped])     return pandas.concat(ret_list) 

By the way: this can not replace any groupby.apply(), but it will cover the typical cases: e.g. it should cover cases 2 and 3 in the documentation, while you should obtain the behaviour of case 1 by giving the argument axis=1 to the final pandas.concat() call.

EDIT: the docs changed; the old version can be found here, in any case I'm copypasting the three examples below.

case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels  case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together  case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together 
like image 20
Pietro Battiston Avatar answered Sep 18 '22 23:09

Pietro Battiston