I intend to parallelize a for-loop in Python as shown below handling large data arrays. How does parallelization over threads/cores/nodes suit this code, and how to implement it? Any advise is appreciated. Thanks!
All the inputs are NumPy arrays with the following typical sizes:
vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3
Edits based on answers to the post:
Test for run-time performance suggest multiprocessing
results in a significant slowdown as well have higher memory requirements.
from timeit import timeit
from multiprocessing import Pool
import numpy as np
from numba import jit
def OP():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
def f1():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
def f2():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
for result in pool.imap(worker, vector_data):
pop_array += result
return pop_array
jit(parallel=True)
def f3():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)
vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3
vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)
print(f'OP: {timeit(OP, number=10):.3e} sec')
print(f'f1: {timeit(f1, number=10):.3e} sec')
print(f'f2: {timeit(f2, number=10):.3e} sec')
print(f'f3: {timeit(f3, number=10):.3e} sec')
Following are the run-time costs from sample runs:
vector_size = int(1E2)
matrix_size = int(1E1)
OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)
OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)
OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)
OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)
OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)
OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)
You could use a multiprocessing
Pool
. Then you can use the map
method to run a function on an iterable. So you could first create the function to be passed to the workers to work on each element from the iterable:
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))
Now you can create the Pool
to run this function on each vector. It will return a list of the results and then we can add those results to pop_array
. Like so:
from multiprocessing import Pool
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
Another possible neater way is to use imap
. From the docs:
Note that it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.
Also:
The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of
1
.
So you can use this code:
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
pool_size = None
chunksize = 1
with Pool(pool_size) as pool:
for result in pool.imap(worker, vector_data, chunksize=chunksize):
pop_array += result
return pop_array
And play with different pool_size
and chunksize
values to acheive best results.
Another option, is to use Threads instead of Processes. Processes have an overhead of creation and maintenence that might affect run-time. To change the code to use threads, simply change the import to use the dummy
wrapper:
from multiprocessing.dummy import Pool
The rest of the code(s) stay the same
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With