I want to apply a function in parallel using multiprocessing.Pool. The problem is that if one function call triggers a segmentation fault the Pool hangs forever. Has anybody an idea how I can make a Pool that detects when something like this happens and raises an error?
The following example shows how to reproduce it (requires scikit-learn > 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
As described in the comments, this just works in Python 3 if you use concurrent.Futures.ProcessPoolExecutor
instead of multiprocessing.Pool
.
If you're stuck on Python 2, the best option I've found is to use the timeout
argument on the result objects returned by Pool.apply_async
and Pool.map_async
. For example:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
This works as long as you have an upper bound for how long a child process should take to complete a task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With