Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

group-by/apply with Pandas and Multiprocessing

I am trying to do a groupby and apply operation on a pandas dataframe using multiprocessing (in the hope of speeding up my code). For example, if I have a dataframe like the following:

            A  B  C
cluster_id         
1           1  2  3
1           1  2  3
2           4  5  6
2           7  8  9

I would like to apply a function on the columns and group them by the cluster_id. In a simple case where the function is just the sum

def my_func(x):
    return sum(x)

then the operation should yield:

            A   B   C
cluster_id         
1           2   4   6
2           11  13  15

There are some similar posts on SO, i did manage to get somewhere close but havent really solved it. My code fails and I dont know how to fix it. This is what I have come up with

import multiprocessing as mp
import pandas as pd
import numpy as np


def _apply_df(args):
    df, func = args
    return df.groupby(level=0).apply(func)


def mp_apply(df, func):
    workers = 4
    pool = mp.Pool(processes=workers)
    split_dfs = np.array_split(df, workers, axis=1)
    result = pool.map(_apply_df, [(d, func) for d in split_dfs])
    pool.close()
    result = sorted(result, key=lambda x: x[0])
    return pd.concat([i[1] for i in result])


def my_func(x):
    return sum(x)


if __name__ == '__main__':
    df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
    df = df.set_index('cluster_id')
    out = mp_apply(df, my_func)
    print(out)

I am getting the error:

  TypeError: unsupported operand type(s) for +: 'int' and 'str'

and It looks like it fails on the line

result = pool.map(_apply_df, [(d, func) for d in split_dfs])

The parameter d that is passed into _apply_df looks to be empty.

Any help/ideas highly appreciated. I am using Python 3.6 if that matters. Thanks!

like image 534
Aenaon Avatar asked Oct 14 '25 12:10

Aenaon


1 Answers

There are 2 main causes of issues in your code

  1. The use of the python's built-in sum function. this is a function that takes an iterable of number and return their sum. e.g. if you try to sum a slice of the dataframe df, you will get the same error traceback

sum(df.loc[1])

TypeError                                 Traceback (most recent call last)
    <ipython-input-60-6dea0ab0880f> in <module>()
    ----> 1 sum(df.loc[1])
TypeError: unsupported operand type(s) for +: 'int' and 'str'

To fix this, you'll need to use pandas sum function as shown below

df.loc[1].sum()

#output 
A    2
B    4
C    6
dtype: int64

As you can see, this will produce the expected result. i.e. sums the columns in the data-slice

  1. The second issue is the "reduce" stage. each process will return a single dataframe, the lines

    result = sorted(result, key=lambda x: x[0])

    return pd.concat([i[1] for i in result])

The first line will produce an error as whenever none of the result has a column called 0. similar issue with the second line. this can be resolved as follows

return pd.concat(result,axis=1)

Now the code will run without issue given the data being used.

The overall code:

import multiprocessing as mp
import pandas as pd
import numpy as np


def _apply_df(args):
    df, func = args
    return df.groupby(level=0).apply(func)


def mp_apply(df, func):
    workers = 4
    pool = mp.Pool(processes=workers)
    split_dfs = np.array_split(df, workers, axis=1)
    result = pool.map(_apply_df, [(d, func) for d in split_dfs])
    pool.close()
    #result = sorted(result, key=lambda x: x[0])
    return pd.concat(result,axis=1)


def my_func(x):
    return x.sum()


if __name__ == '__main__':
    df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
    df = df.set_index('cluster_id')
    out = mp_apply(df, my_func)
    print(out)

Output:

             A   B   C
cluster_id            
1            2   4   6
2           11  13  15
like image 196
sgDysregulation Avatar answered Oct 17 '25 01:10

sgDysregulation



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!