I'm trying to use os.fork
, os.waitpid
, and threading to dispatch some time-consuming work to a child process, wait at most a certain amount of time for it to complete, and then continue on, leaving it running in the background. There is also a secondary timer in the child that prevents it from running for too long in the background, like so:
Fork execution
In the fork's child:
Start a thread to execute a task
If that thread runs for longer than X milliseconds:
thread.stop(), a custom method that softly stops the thread.
In the fork's parent:
If the child's PID stays running for longer than Y milliseconds:
return -1
else:
return 0
After returning a value, I want the script to terminate. The child can and should keep running if it isn't done.
The code I have tried (abbreviated) is:
class MyCustomThread(threading.Thread):
abort = False
def run(self):
counter = 0
while True:
if self.abort: break
counter += 1
if counter == 30: self.stop()
sleep(1)
return 0
def stop(self):
self.abort = True
def test():
X = 2000
Y = 500
pid = os.fork()
if pid == 0:
thread1 = MyCustomThread() #Sleeps for 30 seconds and ends.
thread1.start()
print "Started 1!"
timeout = X # say, 1000ms
while timeout > 0:
if not thread1.is_alive(): return "custom thread finished before the deadline!"
timeout -= 1
sleep(0.001)
if thread1.is_alive():
return "custom thread didn't finish before the deadline!"
thread1.stop()
exit()
else:
print pid
thread2 = Thread(target = os.waitpid, args = (pid, 0))
thread2.start()
print "Started 2!"
timeout2 = Y # say, 500ms
while timeout2 > 0:
if not thread2.is_alive(): return "child PID finished!"
timeout2 -= 1
sleep(0.001)
if thread2.is_alive():
return "child PID didn't finish yet!"
print test()
print "all done!"
The output is correct, in that I get
1171
Started 2!
Started 1!
child PID didn't finish yet!
all done!
custom thread didn't finish before the deadline!
all done!
But then the script doesn't exit! It sleeps for the remaining 28 seconds before
How do I make the main execution of this script complete after the fork
ed parent returns a value? As I said before, the child can and should keep running in the background, it just shouldn't block execution of the next task on the parent.
I really don't even care if the child can print output to standard out--in the actual implementation, all it's doing is talking to a database, so it doesn't need to report anything interactively. The parent, however, needs to dispatch the child, wait at most Y seconds for the child to finish, and then (as far as whoever invoked the script is concerned) end the script's execution so that the next thing can be done. The other timer (X) isn't relevant, I think; it only exists to keep the child from running too long in the background.
Any ideas? I'm probably approaching this totally wrong, so "start over and do it _ way" ideas are welcome.
Try this one, it doesn't use threading, just pure fork/waitpid/alarm/sighandler:
child_exited = False
def sigh(signum, frame):
global child_exited
if signum == signal.SIGALRM:
print "custom thread didn't finish before the deadline!"
#forced exit:
exit(1)
if signum == signal.SIGCHLD:
(pid, status) = os.waitpid(-1, 0)
print "child exited with status: " + str(os.WEXITSTATUS(status))
child_exited = True
def test():
global child_exited
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGALRM, sigh)
signal.alarm(30)
#do work:
print "Started 1"
time.sleep(60)
#clean exit:
exit(0)
elif (pid > 0):
signal.signal(signal.SIGCHLD, sigh)
print "Started 2"
#this sleep will return prematurely if the child exits
time.sleep(10)
if not child_exited:
print "child PID didn't finish yet!"
else:
print "fork() failed"
print test()
print "all done!"
This is not an exact answer to your question, but rather a "start over and do it _ way" idea.
You could use the multiprocessing
module. The function Pool.apply_async()
allows to execute a function in the background, and the returned AsyncResult
object features wait()
and get()
methods with a timeout
parameter.
Your code would essentially become (untested)
import multiprocessing
def computationally_intensive():
# whatever
p = multiprocessing.Pool(1)
deferred = p.apply_async(computationally_intensive)
try:
result = deferred.get(10) # 10 being the timeout
except multiprocessing.TimeoutError:
# Handle time out
# ...
p.terminate()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With