Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Progress indicator during pandas operations

People also ask

What is Progress_apply?

The progress_apply() method is part of the originally created tqdm package which enables you to create a progress meter and estimate “Time To Completion” for your iterations.

What does tqdm pandas do?

tqdm is a package for Python that enables you to instantly create progress bars and estimate TTC (Time To Completion) for your functions and loops!


Due to popular demand, I've added pandas support in tqdm (pip install "tqdm>=4.9.0"). Unlike the other answers, this will not noticeably slow pandas down -- here's an example for DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

In case you're interested in how this works (and how to modify it for your own callbacks), see the examples on GitHub, the full documentation on PyPI, or import the module and run help(tqdm). Other supported functions include map, applymap, aggregate, and transform.

EDIT


To directly answer the original question, replace:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

with:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Note: tqdm <= v4.8: For versions of tqdm below 4.8, instead of tqdm.pandas() you had to do:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

To tweak Jeff's answer (and have this as a reuseable function).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Note: the apply progress percentage updates inline. If your function stdouts then this won't work.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

As usual you can add this to your groupby objects as a method:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

As mentioned in the comments, this isn't a feature that core pandas would be interested in implementing. But python allows you to create these for many pandas objects/methods (doing so would be quite a bit of work... although you should be able to generalise this approach).


For anyone who's looking to apply tqdm on their custom parallel pandas-apply code.

(I tried some of the libraries for parallelization over the years, but I never found a 100% parallelization solution, mainly for the apply function, and I always had to come back for my "manual" code.)

df_multi_core - this is the one you call. It accepts:

  1. Your df object
  2. The function name you'd like to call
  3. The subset of columns the function can be performed upon (helps reducing time / memory)
  4. The number of jobs to run in parallel (-1 or omit for all cores)
  5. Any other kwargs the df's function accepts (like "axis")

_df_split - this is an internal helper function that has to be positioned globally to the running module (Pool.map is "placement dependent"), otherwise I'd locate it internally..

here's the code from my gist (I'll add more pandas function tests there):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow is a test code for a parallelized apply with tqdm "progress_apply".

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

In the output you can see 1 progress bar for running without parallelization, and per-core progress bars when running with parallelization. There is a slight hickup and sometimes the rest of the cores appear at once, but even then I think its usefull since you get the progress stats per core (it/sec and total records, for ex)

enter image description here

Thank you @abcdaa for this great library!


In case you need support for how to use this in a Jupyter/ipython notebook, as I did, here's a helpful guide and source to relevant article:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Note the underscore in the import statement for _tqdm_notebook. As referenced article mentions, development is in late beta stage.

UPDATE as of 11/12/2021

I'm currently now using pandas==1.3.4 and tqdm==4.62.3, and I'm not sure which version tqdm authors implemented this change, but the above import statement is deprecated. Instead use:

 from tqdm.notebook import tqdm_notebook

You can easily do this with a decorator

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

then just use the modified_function (and change when you want it to print)


Every answer here used pandas.DataFrame.groupby. If you want a progress bar on pandas.Series.apply without a groupby, here's how you can do it inside a jupyter-notebook:

from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)

I've changed Jeff's answer, to include a total, so that you can track progress and a variable to just print every X iterations (this actually improves the performance by a lot, if the "print_at" is reasonably high)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

the clear_output() function is from

from IPython.core.display import clear_output

if not on IPython Andy Hayden's answer does that without it