I am trying to do a groupby and apply operation on a pandas dataframe using multiprocessing (in the hope of speeding up my code). For example, if I have a dataframe like the following:
A B C
cluster_id
1 1 2 3
1 1 2 3
2 4 5 6
2 7 8 9
I would like to apply a function on the columns and group them by the cluster_id. In a simple case where the function is just the sum
def my_func(x):
return sum(x)
then the operation should yield:
A B C
cluster_id
1 2 4 6
2 11 13 15
There are some similar posts on SO, i did manage to get somewhere close but havent really solved it. My code fails and I dont know how to fix it. This is what I have come up with
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
def my_func(x):
return sum(x)
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
I am getting the error:
TypeError: unsupported operand type(s) for +: 'int' and 'str'
and It looks like it fails on the line
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
The parameter d
that is passed into _apply_df
looks to be empty.
Any help/ideas highly appreciated. I am using Python 3.6 if that matters. Thanks!
There are 2 main causes of issues in your code
sum(df.loc[1])
TypeError Traceback (most recent call last) <ipython-input-60-6dea0ab0880f> in <module>() ----> 1 sum(df.loc[1]) TypeError: unsupported operand type(s) for +: 'int' and 'str'
To fix this, you'll need to use pandas sum
function as shown below
df.loc[1].sum()
#output
A 2
B 4
C 6
dtype: int64
As you can see, this will produce the expected result. i.e. sums the columns in the data-slice
The second issue is the "reduce" stage. each process will return a single dataframe, the lines
result = sorted(result, key=lambda x: x[0])
return pd.concat([i[1] for i in result])
The first line will produce an error as whenever none of the result has a column called 0. similar issue with the second line. this can be resolved as follows
return pd.concat(result,axis=1)
Now the code will run without issue given the data being used.
The overall code:
import multiprocessing as mp
import pandas as pd
import numpy as np
def _apply_df(args):
df, func = args
return df.groupby(level=0).apply(func)
def mp_apply(df, func):
workers = 4
pool = mp.Pool(processes=workers)
split_dfs = np.array_split(df, workers, axis=1)
result = pool.map(_apply_df, [(d, func) for d in split_dfs])
pool.close()
#result = sorted(result, key=lambda x: x[0])
return pd.concat(result,axis=1)
def my_func(x):
return x.sum()
if __name__ == '__main__':
df = pd.DataFrame([[1, 2, 3, 1], [1, 2, 3, 1], [4, 5, 6, 2], [7, 8, 9, 2]], columns=['A', 'B', 'C', 'cluster_id'])
df = df.set_index('cluster_id')
out = mp_apply(df, my_func)
print(out)
Output:
A B C
cluster_id
1 2 4 6
2 11 13 15
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With