I have a module A that does a basic map/reduce by taking data and sending it to modules B, C, D etc for analysis and then joining their results together.
But it appears that modules B, C, D etc cannot themselves create a multiprocessing pool, or else I get
AssertionError: daemonic processes are not allowed to have children
Is it possible to parallelize these jobs some other way?
For clarity, here's an (admittedly bad) baby example. (I would normally try/catch but you get the gist).
A.py:
import B
from multiprocessing import Pool
def main():
p = Pool()
results = p.map(B.foo,range(10))
p.close()
p.join()
return results
B.py:
from multiprocessing import Pool
def foo(x):
p = Pool()
results = p.map(str,x)
p.close()
p.join()
return results
Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.
Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
The Python Multiprocessing Pool class allows you to create and manage process pools in Python. Although the Multiprocessing Pool has been available in Python for a long time, it is not widely used, perhaps because of misunderstandings of the capabilities and limitations of Processes and Threads in Python.
apply_async() The apply_async() function can be called directly to execute a target function in the process pool. The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.
is it possible to have a pool inside of a pool?
Yes, it is possible though it might not be a good idea unless you want to raise an army of zombies. From Python Process Pool non-daemonic?:
import multiprocessing.pool
from contextlib import closing
from functools import partial
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def foo(x, depth=0):
if depth == 0:
return x
else:
with closing(Pool()) as p:
return p.map(partial(foo, depth=depth-1), range(x + 1))
if __name__ == "__main__":
from pprint import pprint
pprint(foo(10, depth=2))
[[0],
[0, 1],
[0, 1, 2],
[0, 1, 2, 3],
[0, 1, 2, 3, 4],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5, 6],
[0, 1, 2, 3, 4, 5, 6, 7],
[0, 1, 2, 3, 4, 5, 6, 7, 8],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]
concurrent.futures
supports it by default:
# $ pip install futures # on Python 2
from concurrent.futures import ProcessPoolExecutor as Pool
from functools import partial
def foo(x, depth=0):
if depth == 0:
return x
else:
with Pool() as p:
return list(p.map(partial(foo, depth=depth-1), range(x + 1)))
if __name__ == "__main__":
from pprint import pprint
pprint(foo(10, depth=2))
It produces the same output.
Is it possible to parallelize these jobs some other way?
Yes. For example, look at how celery
allows to create a complex workflow.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With