Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: ProcessPoolExecutor vs ThreadPoolExecutor

I have the following function that randomly shuffle the values of one column of the dataframe and use RandomForestClassifier on the overall dataframe including that column that is being randomly shuffled to get the accuracy score.

And I would like to run this function concurrently to each column of the dataframe, as dataframe is pretty large and contains 500k rows and 1k columns. The key is to only randomly shuffle one column at a time.

However, I am struggling to understand why is ProcessPoolExecutor much slower than ThreadPoolExecutor. I thought ThreadPoolExecutor is only suppose to be faster for I/O task. In this case, it doesn't involve reading from or writing to any files.

Or have I done anything wrong here ? Is there a more efficient or better way to optimize this code to make it do things concurrently and run faster?

def randomShuffle(colname, X, y, fit):
    out = {'col_name': colname}
    X_= X.copy(deep = True)
    np.random.shuffle(X_[colname].values) # permutation of a single column
    pred = fit.predict(X_)
    out['scr'] = accuracy_score(y, pred)
    return out

def runConcurrent(classifier, X,y):
    skf = KFold(n_splits=5, shuffle = False)
    acc_scr0, acc_scr1 = pd.Series(), pd.DataFrame(columns = X.columns)
    # split data to training and validation
    for i, (train_idx, val_idx) in enumerate(skf.split(X,y)):
        X_train, y_train = X.iloc[train_idx,:], y.iloc[train_idx]
        X_val, y_val = X.iloc[val_idx,:], y.iloc[val_idx]
        
        fit = classifier.fit(X=X_train, y=y_train)
        # accuracy score
        pred = fit.predict(X_val)
        acc_scr0.loc[i] = accuracy_score(y_val, pred)
        
        # with concurrent.futures.ProcessPoolExecutor() as executor:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = [executor.submit(randomShuffle, colname = j, X= X_val, y= y_val, fit = fit, labels = classifier.classes_) for j in X.columns]
            for res in concurrent.futures.as_completed(results):
                acc_scr1.loc[i, res.result()['col_name']] = res.result()['acc_scr']
    return None
like image 423
user1769197 Avatar asked Sep 16 '25 05:09

user1769197


1 Answers

It is hard to see without testing since the speed of multiprocessing is dependent on a lot of things. First the communication overhead, so if you need to send around a lot of data it is slow, but also the amount of tasks created is important.

Creating a task has quite some overhead and has to be seen in relation to how long a method called takes to return. If a method only takes a fraction of a second to finish and you call it thousand of times, the overhead of creating a task is significant. If, on the other hand, the function takes like multiple seconds to return, the overhead is negligible.

I can't really tell how fast the randomShuffle is, but what you can do and see if it speeds up anything using the map function and a setting a chunksize.

from functools import partial

...

with ProcessPoolExecutor() as executor:
    chunksize = len(points) // (executor._max_workers * 4)
    randomShuffleWrapper = partial(randomShuffle, X=X_val, y=y_val, fit=fit, labels=classifier.classes_)
    results = list(executor.map(randomShuffleWrapper, X.columns, chunksize=chunksize))

The only thing which changes in all calls to randomShuffle is the colname. So create a partial function to set all other parameters and your new function only takes the colname as the first argument. Now we also have to set an appropriate chunksize.

This is a bit of a hyper parameter and really there is no general good value and you maybe need to try different ones to find the best. It creates chunks of your iterable and wraps your function, so that one tasks calculates the outputs for all entries in a chunk.

So if you have 1000 entries and a chunksize of 100, only 10 tasks a created, every task is calculating 100 entries. This will lead to way less overhead from creating and finishing a task.

As a starting point, I use multiprocessing.pool.Pool if chunksize isn't given. ProcessPoolExecutor.map() sets the chunksize to 1 as a default, which basically ends up in what you are already doing, creating a task for every element.

I don't have any idea how big all the things you are passing to the function are. Namely X=X_val, y=y_val, fit=fit, labels=classifier.classes_. If they are big, there will be a lot of communication overhead, since all will always be serialized sent over and deserialized. So also check if they are big and if they have to be. You normally want to only send what is absolutely necessary, and the same with the return of the function. It also should be as small as possible.

This is why you propose using chunksize to chop things up. Is my understanding correct?

...

One other question: say I split the column names into 4 chunks, does it mean 4 processes will be created for these 4 chunks? And for each chunk, how are the data being processed? i.e., for loop or multiprocess / multithread?

So maybe I can explain a bit more what the chunksize actually does, since it is actually quite simple and can be seen directly in the code. I am going to reference code found in Anaconda Python 3.9 python3.9/concurrent/futures/process.py.

It has the following line of code for the ProcessPoolExecutor class.

class ProcessPoolExecutor(_base.Executor):
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout)

The _get_chunks just divides the iterables into equal parts of size chunksize and possibly a smaller part if the length of the iterables is not dividable by chunksize.

partial(_process_chunk, fn) creates a partial function of _process_chunk, which looks like this:

def _process_chunk(fn, chunk):
    return [fn(*args) for args in chunk]

So all it does is to iterate over every element in a chunk and call a function, in your case randomShuffle. So it just means one task does not consist of one call to your randomShuffle but chunksize many calls. All results are collected in a list and later combined.

The super().map()call means the map function from the parent class Executor is used:

class Executor(object)
    ...
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...

As you can see, at this point, also only the submit function is called for all iterables. At this point, the fn is the partial function created earlier partial(_process_chunk, fn) and iterables is what _get_chunks(*iterables, chunksize=chunksize) returned (the equally-sized chunks of the original iterables). So all the map function of the ProcessPoolExecutor does is wrap your function and divide your iterables into chunks for you, before the submit is called.

All is done with the goal to reduce the number of tasks created (submit calls) by having tasks do more, in this case calling a given function for every element of some iterables.

So how do tasks actually map to processes? By creating a ProcessPoolExecutor, you create a pool of processes. The number is defined by the number of your cores on your system or it is what you define via the max_workers argument.

When the submit is called, a worker is assigned to it, so the worker receives all data necessary to run the function and returns the output of the function to the main process. This data transfer is done by serializing and deserializing the data, usually with the pickle module. This is also where a lot of overhead comes from, since transferring data between processes is slow.

So if you created ProcessPoolExecutor with max_workers=10, you can have 10 tasks executed in parallel in theory (if you got 10 cores of course). The abstraction of the pool and tasks is so you do not have to worry what tasks runs where. You just submit all that has to be done and let the ProcessPoolExecutor figure out how to best assign tasks to processes.

like image 148
Nopileos Avatar answered Sep 19 '25 08:09

Nopileos