I need to apply the same function onto every row in a numpy array and store the result again in a numpy array.
# states will contain results of function applied to a row in array
states = np.empty_like(array)
for i, ar in enumerate(array):
states[i] = function(ar, *args)
# do some other stuff on states
function
does some non trivial filtering of my data and returns an array when the conditions are True and when they are False. function
can either be pure python or cython compiled. The filtering operations on the rows are complicated and can depend on previous values in the row, this means I can't operate on the whole array in an element-by-element fashion
Is there a way to do something like this in dask for example?
NumPy does not run in parallel. On the other hand Numba fully utilizes the parallel execution capabilities of your computer. NumPy functions are not going to use multiple CPU cores, never mind the GPU.
The numpy library uses multithreading by default, and so parallelizing a python function that uses numpy may create a huge number of threads.
Various Python packages such as Numpy, Scipy and pandas can utilize OpenMP to run on multiple CPUs.
First, we create two Process objects and assign them the function they will execute when they start running, also known as the target function. Second, we tell the processes to go ahead and run their tasks. And third, we wait for the processes to finish running, then continue with our program.
You could do with with dask.array by chunking the array by row, calling map_blocks
, then computing the result
ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()
By default this will use threads, you can use processes in the following way
from dask.multiprocessing import get
states = x.compute(get=get)
However dask is probably overkill for embarrassingly parallel computations like this, you could get by with a threadpool
from multiprocessing.pool import ThreadPool
pool = ThreadPool()
ar = ...
states = np.empty_like(array)
def f(i):
states[i] = function(ar[i], *args)
pool.map(f, range(len(ar)))
And you could switch to processes with the following change
from multiprocessing import Pool
pool = Pool()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With