The following code parallelizes a for-loop.
import networkx as nx; import numpy as np; from joblib import Parallel, delayed; import multiprocessing; def core_func(repeat_index, G, numpy_arrary_2D): for u in G.nodes(): numpy_arrary_2D[repeat_index][u] = 2; return; if __name__ == "__main__": G = nx.erdos_renyi_graph(100000,0.99); nRepeat = 5000; numpy_array = np.zeros([nRepeat,G.number_of_nodes()]); Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat)); print(np.mean(numpy_array));
As can be seen, the expected value to be printed is 2. However, when I run my code on a cluster (multi-core, shared memory), it returns 0.0.
I think the problem is that each worker creates its own copy of the numpy_array
object, and the one created in the main function is not updated. How can I modify the code such that the numpy array numpy_array
can be updated?
n_jobs is an integer, specifying the maximum number of concurrently running workers. If 1 is given, no joblib parallelism is used at all, which is useful for debugging. If set to -1, all CPUs are used. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For example with n_jobs=-2, all CPUs but one are used.
Below is a list of simple steps to use "Joblib" for parallel computing. Wrap normal python function calls into delayed() method of joblib. Create Parallel object with a number of processes/threads to use for parallel computing. Pass the list of delayed wrapped functions to an instance of Parallel.
Joblib is a set of tools to provide lightweight pipelining in Python. In particular: transparent disk-caching of functions and lazy re-evaluation (memoize pattern) easy simple parallel computing.
The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax. Under Windows, the use of multiprocessing. Pool requires to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.
joblib
uses the multiprocessing pool of processes by default, as its manual says:
Under the hood, the Parallel object create a multiprocessing pool that forks the Python interpreter in multiple processes to execute each of the items of the list. The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.
Which means, that every process inherits the original state of the array, but whatever it writes inside into it, is lost when the process exits. Only the function result is delivered back to the calling (main) process. But you do not return anything, so None
is returned.
To make the shared array modiyable, you have two ways: using threads and using the shared memory.
The threads, unlike the processes, share the memory. So you can write to the array and every job will see this change. According to the joblib
manual, it is done this way:
Parallel(n_jobs=4, backend="threading")(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
When you run it:
$ python r1.py 2.0
However, when you will be writing complex things into the array, make sure you properly handle the locks around the data or data pieces, or you will hit the race conditions (google it).
Also read carefully about GIL, as the computational multithreading in Python is limited (unlike the I/O multithreading).
If you still need the processes (e.g. because of GIL), you can put that array into the shared memory.
This is a bit more complicated topic, but joblib + numpy shared memory example is shown in the joblib
manual also.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With