Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize for-loop in Python over threads/cores/nodes

I intend to parallelize a for-loop in Python as shown below handling large data arrays. How does parallelization over threads/cores/nodes suit this code, and how to implement it? Any advise is appreciated. Thanks!

All the inputs are NumPy arrays with the following typical sizes:

vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3

Edits based on answers to the post:

Test for run-time performance suggest multiprocessing results in a significant slowdown as well have higher memory requirements.

from timeit import timeit
from multiprocessing import Pool

import numpy as np
from numba import jit

def OP():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))

def f1():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

def f2():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        for result in pool.imap(worker, vector_data):
            pop_array += result

    return pop_array

jit(parallel=True)
def f3():
    N = len(matrix_data)
    pop_array = np.zeros((N, N)) 
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)

vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3

vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)

print(f'OP: {timeit(OP, number=10):.3e} sec')
print(f'f1: {timeit(f1, number=10):.3e} sec')
print(f'f2: {timeit(f2, number=10):.3e} sec')
print(f'f3: {timeit(f3, number=10):.3e} sec')

Following are the run-time costs from sample runs:

vector_size = int(1E2)
matrix_size = int(1E1)

OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)

OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)

OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)

OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)

OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)

OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)
like image 674
Viswanath Avatar asked Dec 31 '22 17:12

Viswanath


1 Answers

You could use a multiprocessing Pool. Then you can use the map method to run a function on an iterable. So you could first create the function to be passed to the workers to work on each element from the iterable:

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))

Now you can create the Pool to run this function on each vector. It will return a list of the results and then we can add those results to pop_array. Like so:

from multiprocessing import Pool

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

Another possible neater way is to use imap. From the docs:

Note that it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.

Also:

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

So you can use this code:

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    pool_size = None
    chunksize = 1

    with Pool(pool_size) as pool:
        for result in pool.imap(worker, vector_data, chunksize=chunksize):
            pop_array += result

    return pop_array

And play with different pool_size and chunksize values to acheive best results.


Another option, is to use Threads instead of Processes. Processes have an overhead of creation and maintenence that might affect run-time. To change the code to use threads, simply change the import to use the dummy wrapper:

from multiprocessing.dummy import Pool

The rest of the code(s) stay the same

like image 106
Tomerikoo Avatar answered Jan 14 '23 00:01

Tomerikoo