I need to run many processes, but not all together, for example 4 processes at same time. multiprocessing.Pool
is exactly what I need. But the problem is that I need to terminate a process if it lasts more than a timeout (e.g. 3 seconds). Pool
just supports wait for a timeout for all processes not each of them. This is what I need:
def f():
process_but_kill_if_it_takes_more_than_3_sec()
pool.map(f, inputs)
I couldn't find a simple way to use Pool
with timeouts. There is a solution from Eli Bendersky. It's a function that limits execution time of an arbitrary function via Thread.join(timeout)
. It works, (although it's stop method doesn't work well). But this method runs a new unnecessary thread while main thread of process is just waiting, because we need a timeout controller. It should be possible to control all timeouts from a single point, something like this:
import time
from multiprocessing import Process
def f(n):
time.sleep(n)
timeout = 3
tasks = [1, 2, 4, 1, 8, 2]
procs = []
pool_len = 4
while len(tasks) > 0 or len(procs) > 0:
if len(tasks) > 0 and len(procs) < pool_len:
n = tasks.pop(0)
p = Process(target=f, args=(n,))
p.start()
procs.append({'n': n, 'p': p, 't': time.time() + timeout})
for d in procs:
if not d['p'].is_alive():
procs.remove(d)
print '%s finished' % d['n']
elif d['t'] < time.time():
d['p'].terminate()
procs.remove(d)
print '%s killed' % d['n']
time.sleep(0.05)
The output should be:
1 finished
1 finished
2 finished
4 killed
2 finished
8 killed
Question: Is there a way to use Pool to solve this?
You could make f(n)
cooperative so that it always finishes within a timeout (like in GUI/network event handlers).
If you can't make it cooperative then the only reliable option is to kill the process that is running the function:
import multiprocessing as mp
def run_with_timeout(timeout, func, *args):
receive_end, send_end = mp.Pipe(duplex=False)
p = mp.Process(target=func, args=args, kwargs=dict(send_end=send_end))
p.daemon = True
p.start()
send_end.close() # child must be the only one with it opened
p.join(timeout)
if p.is_alive():
####debug('%s timeout', args)
p.terminate()
else:
return receive_end.recv() # get value from the child
The disadvantage is that it requires a new process for each function call (maxtasksperchild=1
Pool's analog).
It is easy to run 4 processes at the same time using a thread pool:
#!/usr/bin/env python
import logging
import time
from functools import partial
from multiprocessing.pool import ThreadPool
debug = logging.getLogger(__name__).debug
def run_mp(n, send_end):
start = time.time()
debug('%d starting', n)
try:
time.sleep(n)
except Exception as e:
debug('%d error %s', n, e)
finally:
debug('%d done, elapsed: %.3f', n, time.time() - start)
send_end.send({n: n*n})
if __name__=="__main__":
tasks = [1, 2, 4, 1, 8, 2]
logging.basicConfig(format="%(relativeCreated)04d %(message)s", level=logging.DEBUG)
print(ThreadPool(processes=4).map(partial(run_with_timeout, 3, run_mp), tasks))
0027 1 starting
0028 2 starting
0030 4 starting
0031 1 starting
1029 1 done, elapsed: 1.002
1032 1 done, elapsed: 1.002
1033 8 starting
1036 2 starting
2031 2 done, elapsed: 2.003
3029 (4,) timeout
3038 2 done, elapsed: 2.003
4035 (8,) timeout
[{1: 1}, {2: 4}, None, {1: 1}, None, {2: 4}]
Beware: there could be forking + threading issues; you could use a fork-server process to workaround them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With