Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize loop over numpy rows

Tags:

python

numpy

dask

I need to apply the same function onto every row in a numpy array and store the result again in a numpy array.

# states will contain results of function applied to a row in array
states = np.empty_like(array)

for i, ar in enumerate(array):
    states[i] = function(ar, *args)

# do some other stuff on states

function does some non trivial filtering of my data and returns an array when the conditions are True and when they are False. function can either be pure python or cython compiled. The filtering operations on the rows are complicated and can depend on previous values in the row, this means I can't operate on the whole array in an element-by-element fashion

Is there a way to do something like this in dask for example?

like image 728
Max Linke Avatar asked Sep 28 '15 05:09

Max Linke


People also ask

Does NumPy automatically parallelize?

NumPy does not run in parallel. On the other hand Numba fully utilizes the parallel execution capabilities of your computer. NumPy functions are not going to use multiple CPU cores, never mind the GPU.

Is NumPy multithreaded?

The numpy library uses multithreading by default, and so parallelizing a python function that uses numpy may create a huge number of threads.

Does NumPy use OpenMP?

Various Python packages such as Numpy, Scipy and pandas can utilize OpenMP to run on multiple CPUs.

How do you parallelize in Python?

First, we create two Process objects and assign them the function they will execute when they start running, also known as the target function. Second, we tell the processes to go ahead and run their tasks. And third, we wait for the processes to finish running, then continue with our program.


1 Answers

Dask solution

You could do with with dask.array by chunking the array by row, calling map_blocks, then computing the result

ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()

By default this will use threads, you can use processes in the following way

from dask.multiprocessing import get
states = x.compute(get=get)

Pool solution

However dask is probably overkill for embarrassingly parallel computations like this, you could get by with a threadpool

from multiprocessing.pool import ThreadPool
pool = ThreadPool()

ar = ...
states = np.empty_like(array)

def f(i):
    states[i] = function(ar[i], *args)

pool.map(f, range(len(ar)))

And you could switch to processes with the following change

from multiprocessing import Pool
pool = Pool()
like image 127
MRocklin Avatar answered Nov 03 '22 01:11

MRocklin