Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

parallelized algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array

The upshot of the below is that I have an embarrassingly parallel for loop that I am trying to thread. There's a bit of rigamarole to explain the problem, but despite all the verbosity, I think this should be a rather trivial problem that the multiprocessing module is designed to solve easily.

I have a large length-N array of k distinct functions, and a length-N array of abcissa. Thanks to the clever solution provided by @senderle described in Efficient algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array, I have a fast numpy-based algorithm that I can use to evaluate the functions at the abcissa to return a length-N array of ordinates:

def apply_indexed_fast(abcissa, func_indices, func_table):
    """ Returns the output of an array of functions evaluated at a set of input points 
    if the indices of the table storing the required functions are known. 

    Parameters 
    ----------
    func_table : array_like 
        Length k array of function objects

    abcissa : array_like 
        Length Npts array of points at which to evaluate the functions. 

    func_indices : array_like 
        Length Npts array providing the indices to use to choose which function 
        operates on each abcissa element. Thus func_indices is an array of integers 
        ranging between 0 and k-1. 

    Returns 
    -------
    out : array_like 
        Length Npts array giving the evaluation of the appropriate function on each 
        abcissa element. 
    """
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(abcissa)

    for i in range(len(func_table)):
        f = func_table[i]
        start = func_ranges[i]
        end = func_ranges[i+1]
        ix = func_argsort[start:end]
        out[ix] = f(abcissa[ix])

    return out

What I'm now trying to do is use multiprocessing to parallelize the for loop inside this function. Before describing my approach, for clarity I'll briefly sketch how the algorithm @senderle developed works. If you can read the above code and understand it immediately, just skip the next paragraph of text.

First we find the array of indices that sorts the input func_indices, which we use to define the length-k func_ranges array of integers. The integer entries of func_ranges control the function that gets applied to the appropriate sub-array of the input abcissa, which works as follows. Let f be the i^th function in the input func_table. Then the slice of the input abcissa to which we should apply the function f is slice(func_ranges[i], func_ranges[i+1]). So once func_ranges is calculated, we can just run a simple for loop over the input func_table and successively apply each function object to the appropriate slice, filling our output array. See the code below for a minimal example of this algorithm in action.

def trivial_functional(i): 
    def f(x):
        return i*x
    return f

k = 250
func_table = np.array([trivial_functional(j) for j in range(k)])

Npts = 1e6
abcissa = np.random.random(Npts)
func_indices = np.random.random_integers(0,len(func_table)-1,Npts)

result = apply_indexed_fast(abcissa, func_indices, func_table)

So my goal now is to use multiprocessing to parallelize this calculation. I thought this would be straightforward using my usual trick for threading embarrassingly parallel for loops. But my attempt below raises an exception that I do not understand.

from multiprocessing import Pool, cpu_count
def apply_indexed_parallelized(abcissa, func_indices, func_table):
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(abcissa)

    num_cores = cpu_count()
    pool = Pool(num_cores)

    def apply_funci(i):
        f = func_table[i]
        start = func_ranges[i]
        end = func_ranges[i+1]
        ix = func_argsort[start:end]
        out[ix] = f(abcissa[ix])

    pool.map(apply_funci, range(len(func_table)))
    pool.close()

    return out

result = apply_indexed_parallelized(abcissa, func_indices, func_table)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I have seen this elsewhere on SO: Multiprocessing: How to use Pool.map on a function defined in a class?. One by one, I have tried each method proposed there; in all cases, I get a "too many files open" error because the threads were never closed, or the adapted algorithm simply hangs. This seems like there should be a simple solution since this is nothing more than threading an embarrassingly parallel for loop.

like image 592
aph Avatar asked Sep 19 '15 15:09

aph


People also ask

Is NumPy parallelized?

NumPy does not run in parallel. On the other hand Numba fully utilizes the parallel execution capabilities of your computer. NumPy functions are not going to use multiple CPU cores, never mind the GPU.

What is 1D NumPy array?

One dimensional array contains elements only in one dimension. In other words, the shape of the numpy array should contain only one value in the tuple. To create a one dimensional array in Numpy, you can use either of the array(), arange() or linspace() numpy functions.

How do you reshape a NumPy array in one direction?

Flattening array means converting a multidimensional array into a 1D array. We can use reshape(-1) to do this.


1 Answers

Warning/Caveat:

You may not want to apply multiprocessing to this problem. You'll find that relatively simple operations on large arrays, the problems will be memory bound with numpy. The bottleneck is moving data from RAM to the CPU caches. The CPU is starved for data, so throwing more CPUs at the problem doesn't help much. Furthermore, your current approach will pickle and make a copy of the entire array for every item in your input sequence, which adds lots of overhead.

There are plenty of cases where numpy + multiprocessing is very effective, but you need to make sure you're working with a CPU-bound problem. Ideally, it's a CPU-bound problem with relatively small inputs and outputs to alleviate the overhead of pickling the input and output. For many of the problems that numpy is most often used for, that's not the case.


Two Problems with Your Current Approach

On to answering your question:

Your immediate error is due to passing in a function that's not accessible from the global scope (i.e. a function defined inside a function).

However, you have another issue. You're treating the numpy arrays as though they're shared memory that can be modified by each process. Instead, when using multiprocessing the original array will be pickled (effectively making a copy) and passed to each process independently. The original array will never be modified.


Avoiding the PicklingError

As a minimal example to reproduce your error, consider the following:

import multiprocessing

def apply_parallel(input_sequence):
    def func(x):
        pass
    pool = multiprocessing.Pool()
    pool.map(func, input_sequence)
    pool.close()

foo = range(100)
apply_parallel(foo)

This will result in:

PicklingError: Can't pickle <type 'function'>: attribute lookup 
               __builtin__.function failed

Of course, in this simple example, we could simply move the function definition back into the __main__ namespace. However, in yours, you need it to refer to data that's passed in. Let's look at an example that's a bit closer to what you're doing:

import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window):
    data = np.pad(data, window, mode='edge')
    ind = np.arange(len(data)) + window

    def func(i):
        return data[i-window:i+window+1].mean()

    pool = multiprocessing.Pool()
    result = pool.map(func, ind)
    pool.close()
    return result

foo = np.random.rand(20).cumsum()
result = parallel_rolling_mean(foo, 10)

There are multiple ways you could handle this, but a common approach is something like:

import numpy as np
import multiprocessing

class RollingMean(object):
    def __init__(self, data, window):
        self.data = np.pad(data, window, mode='edge')
        self.window = window

    def __call__(self, i):
        start = i - self.window
        stop = i + self.window + 1
        return self.data[start:stop].mean()

def parallel_rolling_mean(data, window):
    func = RollingMean(data, window)
    ind = np.arange(len(data)) + window

    pool = multiprocessing.Pool()
    result = pool.map(func, ind)
    pool.close()
    return result

foo = np.random.rand(20).cumsum()
result = parallel_rolling_mean(foo, 10)

Great! It works!


However, if you scale this up to large arrays, you'll soon find that it will either run very slow (which you can speed up by increasing chunksize in the pool.map call) or you'll quickly run out of RAM (once you increase the chunksize).

multiprocessing pickles the input so that it can be passed to separate and independent python processes. This means you're making a copy of the entire array for every i you operate on.

We'll come back to this point in a bit...


multiprocessing Does not share memory between processes

The multiprocessing module works by pickling the inputs and passing them to independent processes. This means that if you modify something in one process the other process won't see the modification.

However, multiprocessing also provides primitives that live in shared memory and can be accessed and modified by child processes. There are a few different ways of adapting numpy arrays to use a shared memory multiprocessing.Array. However, I'd recommend avoiding those at first (read up on false sharing if you're not familiar with it). There are cases where it's very useful, but it's typically to conserve memory, not to improve performance.

Therefore, it's best to do all modifications to a large array in a single process (this is also a very useful pattern for general IO). It doesn't have to be the "main" process, but it's easiest to think about that way.

As an example, let's say we wanted to have our parallel_rolling_mean function take an output array to store things in. A useful pattern is something similar to the following. Notice the use of iterators and modifying the output only in the main process:

import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window, output):
    def windows(data, window):
        padded = np.pad(data, window, mode='edge')
        for i in xrange(len(data)):
            yield padded[i:i + 2*window + 1]

    pool = multiprocessing.Pool()
    results = pool.imap(np.mean, windows(data, window))
    for i, result in enumerate(results):
        output[i] = result
    pool.close()

foo = np.random.rand(20000000).cumsum()
output = np.zeros_like(foo)
parallel_rolling_mean(foo, 10, output)
print output

Hopefully that example helps clarify things a bit.


chunksize and performance

One quick note on performance: If we scale this up, it will get very slow very quickly. If you look at a system monitor (e.g. top/htop), you may notice that your cores are sitting idle most of the time.

By default, the master process pickles each input for each process and passes it in immediately and then waits until they're finished to pickle the next input. In many cases, this means that the master process works, then sits idle while the worker processes are busy, then the worker processes sit idle while the master process is pickling the next input.

The key is to increase the chunksize parameter. This will cause pool.imap to "pre-pickle" the specified number of inputs for each process. Basically, the master thread can stay busy pickling inputs and the worker processes can stay busy processing. The downside is that you're using more memory. If each input uses up a large amount of RAM, this can be a bad idea. If it doesn't, though, this can dramatically speed things up.

As a quick example:

import numpy as np
import multiprocessing

def parallel_rolling_mean(data, window, output):
    def windows(data, window):
        padded = np.pad(data, window, mode='edge')
        for i in xrange(len(data)):
            yield padded[i:i + 2*window + 1]

    pool = multiprocessing.Pool()
    results = pool.imap(np.mean, windows(data, window), chunksize=1000)
    for i, result in enumerate(results):
        output[i] = result
    pool.close()

foo = np.random.rand(2000000).cumsum()
output = np.zeros_like(foo)
parallel_rolling_mean(foo, 10, output)
print output

With chunksize=1000, it takes 21 seconds to process a 2-million element array:

python ~/parallel_rolling_mean.py  83.53s user 1.12s system 401% cpu 21.087 total

But with chunksize=1 (the default) it takes about eight times as long (2 minutes, 41 seconds).

python ~/parallel_rolling_mean.py  358.26s user 53.40s system 246% cpu 2:47.09 total

In fact, with the default chunksize, it's actually far worse than a single-process implementation of the same thing, which takes only 45 seconds:

python ~/sequential_rolling_mean.py  45.11s user 0.06s system 99% cpu 45.187 total
like image 95
Joe Kington Avatar answered Oct 05 '22 04:10

Joe Kington