Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cant stop\kill all processes at once produced by multiprocessing.Pool

I need to stop\kill all processes when there is any error\exception. I found on StackOwerflow solution to kill all processes using psutil, but from time to time I have an issue - when psutil kills child and main processes, new processes may start and code continues to execute.

import psutil

class MyClass:
    parent_pid = 0
    ids_list = range(300)

    def main(self):
        self.parent_pid = os.getpid()
        pool = multiprocessing.Pool(3)

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,  
                             kwds=dict(country_id=osm_id),
                             error_callback=self.kill_proc_tree)

        pool.close()
        pool.join()

    def kill_proc_tree(self, including_parent=True):
        parent = psutil.Process(self.parent_pid)
        children = parent.children(recursive=True)

        for child in children:
            child.kill()
        psutil.wait_procs(children, timeout=5)

        if including_parent:
            parent.kill()
            parent.wait(5)

    def handle_country_or_region(self, country_id=None, queue=None):
        pass
        # here I do some task

It seems that I need to terminate pool rather than kill processes, but in this case, if I do

pool.close()
pool.terminate()
pool.join()

my terminal stops doing anything, new line is fully empty (i.e without ">>>") and nothing happens.

Ideally I want to have the next flow: If there is any error\exception, stop\kill all code executions and back to interactive prompt in terminal.

Can anyone help me to make it work properly? I use Python 3.5 and Ubuntu 15.10

like image 505
TitanFighter Avatar asked Apr 04 '16 13:04

TitanFighter


People also ask

How do you stop a multiprocessing pool?

You can forcefully kill tasks in the process pool by the Pool terminate() function that will terminate all child worker processes immediately.

How do I stop all processes in Python multiprocessing?

You can kill all child processes by first getting a list of all active child processes via the multiprocessing. active_children() function then calling either terminate() or kill() on each process instance.

How do you cancel multiprocess?

We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

What does pool Terminate do?

close() function or the Pool. The close() function will wait for all issued tasks to finish before closing the worker processes, whereas the terminate() function will immediately terminate all worker processes, even if they are currently processing tasks.


1 Answers

The solution is quite simple - put 'killer'-function inside the 'main'.

Full code looks like:

class MyClass:
    ids_list = range(300)

    def main(self):
        pool = multiprocessing.Pool(3)

        def kill_pool(err_msg):
            print(err_msg)
            pool.terminate()

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,     
                             kwds=dict(country_id=osm_id),
                             error_callback=kill_pool)

        pool.close()
        pool.join()

    def handle_country_or_region(self, country_id=None, queue=None):
        pass  # here I do some task

If anyone needs to use queue, below is extended variant of the code, which shows how to finish queue in a correct way, that avoids having zombie processes:

import pickle
import os
import multiprocessing

class MyClass:
    ids_list = range(300)
    folder = os.path.join(os.getcwd(), 'app_geo')
    STOP_TOKEN = 'stop queue'

    def main(self):

        # >>> Queue part shared between processes <<<
        manager = multiprocessing.Manager()
        remove_id_queue = manager.Queue()

        remove_id_process = multiprocessing.Process(target=self.remove_id_from_file,
                                                    args=(remove_id_queue,))
        remove_id_process.start()
        # >>> End of queue part <<<

        pool = multiprocessing.Pool(3)

        def kill_pool(err_msg):
            print(err_msg)
            pool.terminate()

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,     
                             kwds=dict(country_id=osm_id),
                             error_callback=kill_pool)

        pool.close()
        pool.join()

        # >>> Anti-zombie processes queue part <<<
        remove_id_queue.put(self.STOP_TOKEN)
        remove_id_process.join()
        manager.shutdown()
        # >>> End

    def handle_country_or_region(self, country_id=None, queue=None):
        # here I do some task
        queue.put(country_id)

    def remove_id_from_file(self, some_queue):
        while True:
            osm_id = some_queue.get()
            if osm_id == self.STOP_TOKEN:
                return
            self.ids_list.remove(osm_id)
            with open(self.folder + '/ids_list.pickle', 'wb') as f:
                pickle.dump(self.ids_list, f)
like image 55
TitanFighter Avatar answered Oct 27 '22 01:10

TitanFighter