I need to stop\kill all processes when there is any error\exception. I found on StackOwerflow solution to kill all processes using psutil
, but from time to time I have an issue - when psutil
kills child and main processes, new processes may start and code continues to execute.
import psutil
class MyClass:
parent_pid = 0
ids_list = range(300)
def main(self):
self.parent_pid = os.getpid()
pool = multiprocessing.Pool(3)
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=self.kill_proc_tree)
pool.close()
pool.join()
def kill_proc_tree(self, including_parent=True):
parent = psutil.Process(self.parent_pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)
def handle_country_or_region(self, country_id=None, queue=None):
pass
# here I do some task
It seems that I need to terminate pool rather than kill processes, but in this case, if I do
pool.close()
pool.terminate()
pool.join()
my terminal stops doing anything, new line is fully empty (i.e without ">>>") and nothing happens.
Ideally I want to have the next flow: If there is any error\exception, stop\kill all code executions and back to interactive prompt in terminal.
Can anyone help me to make it work properly? I use Python 3.5 and Ubuntu 15.10
You can forcefully kill tasks in the process pool by the Pool terminate() function that will terminate all child worker processes immediately.
You can kill all child processes by first getting a list of all active child processes via the multiprocessing. active_children() function then calling either terminate() or kill() on each process instance.
We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.
close() function or the Pool. The close() function will wait for all issued tasks to finish before closing the worker processes, whereas the terminate() function will immediately terminate all worker processes, even if they are currently processing tasks.
The solution is quite simple - put 'killer'-function inside the 'main'.
Full code looks like:
class MyClass:
ids_list = range(300)
def main(self):
pool = multiprocessing.Pool(3)
def kill_pool(err_msg):
print(err_msg)
pool.terminate()
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)
pool.close()
pool.join()
def handle_country_or_region(self, country_id=None, queue=None):
pass # here I do some task
If anyone needs to use queue
, below is extended variant of the code, which shows how to finish queue
in a correct way, that avoids having zombie processes:
import pickle
import os
import multiprocessing
class MyClass:
ids_list = range(300)
folder = os.path.join(os.getcwd(), 'app_geo')
STOP_TOKEN = 'stop queue'
def main(self):
# >>> Queue part shared between processes <<<
manager = multiprocessing.Manager()
remove_id_queue = manager.Queue()
remove_id_process = multiprocessing.Process(target=self.remove_id_from_file,
args=(remove_id_queue,))
remove_id_process.start()
# >>> End of queue part <<<
pool = multiprocessing.Pool(3)
def kill_pool(err_msg):
print(err_msg)
pool.terminate()
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)
pool.close()
pool.join()
# >>> Anti-zombie processes queue part <<<
remove_id_queue.put(self.STOP_TOKEN)
remove_id_process.join()
manager.shutdown()
# >>> End
def handle_country_or_region(self, country_id=None, queue=None):
# here I do some task
queue.put(country_id)
def remove_id_from_file(self, some_queue):
while True:
osm_id = some_queue.get()
if osm_id == self.STOP_TOKEN:
return
self.ids_list.remove(osm_id)
with open(self.folder + '/ids_list.pickle', 'wb') as f:
pickle.dump(self.ids_list, f)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With