Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficiently applying a function to a grouped pandas DataFrame in parallel

I often need to apply a function to the groups of a very large DataFrame (of mixed data types) and would like to take advantage of multiple cores.

I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.

Is there any way to avoid the pickling or even avoid the copying of the DataFrame completely? It looks like the shared memory functions of the multiprocessing modules are limited to numpy arrays. Are there any other options?

like image 840
user2303 Avatar asked Jul 30 '12 20:07

user2303


People also ask

Does Pandas apply work in parallel?

Pandas' apply(~) method uses a single core, which means that a single thread is used to perform this method. If your machine has multiple cores, then you would be able to execute the apply(~) method in parallel.

Is DF apply faster than Iterrows?

By using apply and specifying one as the axis, we can run a function on every row of a dataframe. This solution also uses looping to get the job done, but apply has been optimized better than iterrows , which results in faster runtimes.

What is the most efficient way to loop through Dataframes with Pandas?

Vectorization is always the first and best choice. You can convert the data frame to NumPy array or into dictionary format to speed up the iteration workflow. Iterating through the key-value pair of dictionaries comes out to be the fastest way with around 280x times speed up for 20 million records.

How to apply functions in a group in a pandas Dataframe?

In this article, let’s see how to apply functions in a group in a Pandas Dataframe. Steps to be followed for performing this task are – Import the necessary libraries. Set up the data as a Pandas DataFrame. Use apply function to find different statistical measures like Rolling Mean, Average, Sum, Maximum, and Minimum.

How do I run apply (~) in parallel in pandas?

To run apply (~) in parallel, use Dask, which is an easy-to-use library that performs Pandas' operations in parallel by splitting up the DataFrame into smaller partitions. Consider the following Pandas DataFrame with one million rows:

What is the use of groupby in pandas?

Often data analysis requires data to be broken into groups to perform various operations on these groups. The GroupBy function in Pandas employs the split-apply-combine strategy meaning it performs a combination of — splitting an object, applying functions to the object and combining the results.

How to find statistical measures in pandas Dataframe?

Set up the data as a Pandas DataFrame. Use apply function to find different statistical measures like Rolling Mean, Average, Sum, Maximum, and Minimum. You can use the lambda function for this. Let’s create the dataframe. Attention geek!


Video Answer


1 Answers

From the comments above, it seems that this is planned for pandas some time (there's also an interesting-looking rosetta project which I just noticed).

However, until every parallel functionality is incorporated into pandas, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas directly using cython + OpenMP and C++.

Here's a short example of writing a parallel groupby-sum, whose use is something like this:

import pandas as pd import para_group_demo  df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) print para_group_demo.sum(df.a, df.b) 

and output is:

     sum key      0      6 1      11 2      4 

Note Doubtlessly, this simple example's functionality will eventually be part of pandas. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas.


To do this, I wrote a simple single-source-file extension whose code follows.

It starts with some imports and type definitions

from libc.stdint cimport int64_t, uint64_t from libcpp.vector cimport vector from libcpp.unordered_map cimport unordered_map  cimport cython from cython.operator cimport dereference as deref, preincrement as inc from cython.parallel import prange  import pandas as pd  ctypedef unordered_map[int64_t, uint64_t] counts_t ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t ctypedef vector[counts_t] counts_vec_t 

The C++ unordered_map type is for summing by a single thread, and the vector is for summing by all threads.

Now to the function sum. It starts off with typed memory views for fast access:

def sum(crit, vals):     cdef int64_t[:] crit_view = crit.values     cdef int64_t[:] vals_view = vals.values 

The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:

    cdef uint64_t num_threads = 4     cdef uint64_t l = len(crit)     cdef uint64_t s = l / num_threads + 1     cdef uint64_t i, j, e     cdef counts_vec_t counts     counts = counts_vec_t(num_threads)     counts.resize(num_threads)     with cython.boundscheck(False):         for i in prange(num_threads, nogil=True):              j = i * s             e = j + s             if e > l:                 e = l             while j < e:                 counts[i][crit_view[j]] += vals_view[j]                 inc(j) 

When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map:

    cdef counts_t total     cdef counts_it_t it, e_it     for i in range(num_threads):         it = counts[i].begin()         e_it = counts[i].end()         while it != e_it:             total[deref(it).first] += deref(it).second             inc(it)         

All that's left is to create a DataFrame and return the results:

    key, sum_ = [], []     it = total.begin()     e_it = total.end()     while it != e_it:         key.append(deref(it).first)         sum_.append(deref(it).second)         inc(it)      df = pd.DataFrame({'key': key, 'sum': sum_})     df.set_index('key', inplace=True)     return df 
like image 97
Ami Tavory Avatar answered Sep 20 '22 16:09

Ami Tavory