1. Why does the following Python code using the concurrent.futures
module hang forever?
import concurrent.futures
class A:
def f(self):
print("called")
class B(A):
def f(self):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
executor.submit(super().f)
if __name__ == "__main__":
B().f()
The call raises an invisible exception [Errno 24] Too many open files
(to see it, replace the line executor.submit(super().f)
with print(executor.submit(super().f).exception())
).
However, replacing ProcessPoolExecutor
with ThreadPoolExecutor
prints "called" as expected.
2. Why does the following Python code using the multiprocessing.pool
module raise the exception AssertionError: daemonic processes are not allowed to have children
?
import multiprocessing.pool
class A:
def f(self):
print("called")
class B(A):
def f(self):
pool = multiprocessing.pool.Pool(2)
pool.apply(super().f)
if __name__ == "__main__":
B().f()
However, replacing Pool
with ThreadPool
prints "called" as expected.
Environment: CPython 3.7, MacOS 10.14.
The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
The ThreadPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map(). For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map().
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
concurrent.futures.ProcessPoolExecutor
and multiprocessing.pool.Pool
uses multiprocessing.queues.Queue
to pass the work function object from caller to worker process, Queue
uses pickle
module to serialize/unserialize, but it failed to proper processing bound method object with child class instance:
f = super().f
print(f)
pf = pickle.loads(pickle.dumps(f))
print(pf)
outputs:
<bound method A.f of <__main__.B object at 0x104b24da0>>
<bound method B.f of <__main__.B object at 0x104cfab38>>
A.f
becomes B.f
, this effectly creates infinite recursive calling B.f
to B.f
in the worker process.
pickle.dumps
utilize __reduce__
method of bound method object, IMO, its implementation, has no consideration of this scenario, which does not take care of the real func
object, but only try to get back from instance self
obj (B()
) with the simple name (f
), which resulting B.f
, very likely a bug.
good news is, as we know where the issue is, we could fix it by implementing our own reduction function that tries to recreate the bound method object from the original function (A.f
) and instance obj (B()
):
import types
import copyreg
import multiprocessing
def my_reduce(obj):
return (obj.__func__.__get__, (obj.__self__,))
copyreg.pickle(types.MethodType, my_reduce)
multiprocessing.reduction.register(types.MethodType, my_reduce)
we could do this because bound method is a descriptor.
ps: I have filed a bug report.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With