Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pandas: optimizing my code (groupby() / apply())

Tags:

pandas

I have a dataframe of shape (RxC) 1.5M x 128. I do the following:

  1. I do groupby() based on 6 columns. This creates ~8700 sub-groups each of shape 538 x 122.
  2. On each sub-group, I run apply(). This function computes the % frequency of each categorical value PER column (i.e., 122) in the sub-group.

So my (pesudo) code:

<df = Read dataframe from file> g = df.groupby(grp_cols) g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts()) / len(d.index))

The code is working OK for me so now I'm profiling it to improve performance. The apply() function takes about 20-25 minutes to run. I believe the problem is it is iterating over every column (122 times) for 8700 times (each subgroup) which may not be the best way (given the way I have coded it).

Can anyone recommend ways I can try to speed this up?

I tried using python multiprocessing pool (8 processes) to divide the subgroups into equal sets to process, but ended up getting some pickling error...

Thanks.

like image 793
user4979733 Avatar asked Jun 17 '15 23:06

user4979733


People also ask

How does pandas groupby apply work?

The GroupBy function in Pandas employs the split-apply-combine strategy meaning it performs a combination of — splitting an object, applying functions to the object and combining the results.

What are the three phases of the pandas groupby () function?

The “group by” process: split-apply-combine (1) Splitting the data into groups. (2). Applying a function to each group independently, (3) Combining the results into a data structure.

Is groupby faster on index pandas?

Although Groupby is much faster than Pandas GroupBy. apply and GroupBy. transform with user-defined functions, Pandas is much faster with common functions like mean and sum because they are implemented in Cython. The speed differences are not small.


1 Answers

pd.DataFrame.groupby.apply really gives us a lot of flexibility (unlike agg/filter/transform, it allows you to reshape each subgroup to any shape, in your case, from 538 x 122 to N_categories x 122). But it indeed comes with a cost: apply your flexible function one-by-one and lacks of vectorization.

I still think the way to solve it is to use multiprocessing. The pickle error you encounter is most likely because you define some functions inside your multi_processing_function. The rule is that you must move all functions on top levels. See the code below.

import pandas as pd
import numpy as np

# simulate your data with int 0 - 9 for categorical values
df = pd.DataFrame(np.random.choice(np.arange(10), size=(538, 122)))
# simulate your groupby operations, not so cracy with 8700 sub-groups, just try 800 groups for illustration
sim_keys = ['ROW' + str(x) for x in np.arange(800)]
big_data = pd.concat([df] * 800, axis=0, keys=sim_keys)
big_data.shape

big_data.shape
Out[337]: (430400, 122)

# Without multiprocessing
# ===================================================
by_keys = big_data.groupby(level=0)

sample_group = list(by_keys)[0][1]
sample_group.shape

def your_func(g):
    return g.apply(lambda s: s.value_counts()) / len(g.index)

def test_no_multiprocessing(gb, apply_func):
    return gb.apply(apply_func)

%time result_no_multiprocessing = test_no_multiprocessing(by_keys, your_func)

CPU times: user 1min 26s, sys: 4.03 s, total: 1min 30s
Wall time: 1min 27

Pretty slow here. Let's use multiprocessing module:

# multiprocessing for pandas dataframe apply
# ===================================================
# to void pickle error, must define functions at TOP level, if we move this function 'process' into 'test_with_multiprocessing', it raises a pickle error
def process(df):
    return df.groupby(level=0).apply(your_func)

def test_with_multiprocessing(big_data, apply_func):

    import multiprocessing as mp

    p = mp.Pool(processes=8)
    # split it into 8 chunks
    split_dfs = np.array_split(big_data, 8, axis=0)
    # define the mapping function, wrapping it to take just df as input
    # apply to each chunk
    df_pool_results = p.map(process, split_dfs)

    p.close()

    # combine together
    result = pd.concat(df_pool_results, axis=0)

    return result


%time result_with_multiprocessing = test_with_multiprocessing(big_data, your_func)

CPU times: user 984 ms, sys: 3.46 s, total: 4.44 s
Wall time: 22.3 s

Now, it's much faster, especially in CPU times. Although a bit overheads are there when we split and recombine the result, it expects to be about 4 - 6 times faster than non-multiprocessing case, when using a 8-core processor.

Finally, check whether two results are the same.

import pandas.util.testing as pdt

pdt.assert_frame_equal(result_no_multiprocessing, result_with_multiprocessing)

Pass the test beautifully.

like image 79
Jianxun Li Avatar answered Oct 16 '22 21:10

Jianxun Li