Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python dynamic multiprocessing and signalling issues

I have a python multiprocessing setup (i.e. worker processes) with custom signal handling, which prevents the worker from cleanly using multiprocessing itself. (See extended problem description below).

The Setup

The master class that spawns all worker processes looks like the following (some parts stripped to only contain the important parts).

Here, it re-binds its own signals only to print Master teardown; actually the received signals are propagated down the process tree and must be handled by the workers themselves. This is achieved by re-binding the signals after workers have been spawned.

class Midlayer(object):
    def __init__(self, nprocs=2):
        self.nprocs = nprocs
        self.procs = []

    def handle_signal(self, signum, frame):
        log.info('Master teardown')
        for p in self.procs:
            p.join()
        sys.exit()

    def start(self):
        # Start desired number of workers
        for _ in range(nprocs):
            p = Worker()
            self.procs.append(p)
            p.start()

        # Bind signals for master AFTER workers have been spawned and started
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        # Serve forever, only exit on signals
        for p in self.procs:
            p.join()

The worker class bases multiprocessing.Process and implements its own run()-method.

In this method, it connects to a distributed message queue and polls the queue for items forever. Forever should be: until the worker receives SIGINT or SIGTERM. The worker should not quit immediately; instead, it has to finish whatever calculation it does and will quit afterwards (once quit_req is set to True).

class Worker(Process):
    def __init__(self):
        self.quit_req = False
        Process.__init__(self)

    def handle_signal(self, signum, frame):
        print('Stopping worker (pid: {})'.format(self.pid))
        self.quit_req = True

    def run(self):
        # Set signals for worker process
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        q = connect_to_some_distributed_message_queue()

        # Start consuming
        print('Starting worker (pid: {})'.format(self.pid))
        while not self.quit_req:
            message = q.poll()
            if len(message):
                try:
                    print('{} handling message "{}"'.format(
                        self.pid, message)
                    )
                    # Facade pattern: Pick the correct target function for the
                    # requested message and execute it.
                    MessageRouter.route(message)
                except Exception as e:
                    print('{} failed handling "{}": {}'.format(
                        self.pid, message, e.message)
                    )

The Problem

So far for the basic setup, where (almost) everything works fine:

  • The master process spawns the desired number of workers
  • Each worker connects to the message queue
  • Once a message is published, one of the workers receives it
  • The facade pattern (using a class named MessageRouter) routes the received message to the respective function and executes it

Now for the problem: Target functions (where the message gets directed to by the MessageRouter facade) may contain very complex business logic and thus may require multiprocessing.

If, for example, the target function contains something like this:

nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)

Then the processes spawned by the Pool will also redirect their signal handling for SIGINT and SIGTERM to the worker's handle_signal function (because of signal propagation to the process subtree), essentially printing Stopping worker (pid: ...) and not stopping at all. I know, that this happens due to the fact that I have re-bound the signals for the worker before its own child-processes are spawned.

This is where I'm stuck: I just cannot set the workers' signals after spawning its child processes, because I do not know whether or not it spawns some (target functions are masked and may be written by others), and because the worker stays (as designed) in its poll-loop. At the same time, I cannot expect the implementation of a target function that uses multiprocessing to re-bind its own signal handlers to (whatever) default values.

Currently, I feel like restoring signal handlers in each loop in the worker (before the message is routed to its target function) and resetting them after the function has returned is the only option, but it simply feels wrong.

Do I miss something? Do you have any advice? I'd be really happy if someone could give me a hint on how to solve the flaws of my design here!

like image 495
jbndlr Avatar asked Nov 18 '16 08:11

jbndlr


People also ask

How do I stop a Python multiprocess?

A process can be killed by calling the Process. terminate() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.

Why is multiprocessing so slow Python?

This is due to the Python GIL being the bottleneck preventing threads from running completely concurrently. The best possible CPU utilisation can be achieved by making use of the ProcessPoolExecutor or Process modules which circumvents the GIL and make code run more concurrently.

How many processes should be running Python multiprocessing?

If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.

What is multiprocess Python?

Python multiprocessing Process class is an abstraction that sets up another Python process, provides it to run code and a way for the parent application to control execution. There are two important functions that belongs to the Process class - start() and join() function.


1 Answers

There is not a clear approach for tackling the issue in the way you want to proceed. I often find myself in situations where I have to run unknown code (represented as Python entry point functions which might get down into some C weirdness) in multiprocessing environments.

This is how I approach the problem.

The main loop

Usually the main loop is pretty simple, it fetches a task from some source (HTTP, Pipe, Rabbit Queue..) and submits it to a Pool of workers. I make sure the KeyboardInterrupt exception is correctly handled to shutdown the service.

try:
    while 1:
        task = get_next_task()
        service.process(task)
except KeyboardInterrupt:
    service.wait_for_pending_tasks()
    logging.info("Sayonara!")

The workers

The workers are managed by a Pool of workers from either multiprocessing.Pool or from concurrent.futures.ProcessPoolExecutor. If I need more advanced features such as timeout support I either use billiard or pebble.

Each worker will ignore SIGINT as recommended here. SIGTERM is left as default.

The service

The service is controlled either by systemd or supervisord. In either cases, I make sure that the termination request is always delivered as a SIGINT (CTL+C).

I want to keep SIGTERM as an emergency shutdown rather than relying only on SIGKILL for that. SIGKILL is not portable and some platforms do not implement it.

"I whish it was that simple"

If things are more complex, I'd consider the use of frameworks such as Luigi or Celery.

In general, reinventing the wheel on such things is quite detrimental and gives little gratifications. Especially if someone else will have to look at that code.

The latter sentence does not apply if your aim is to learn how these things are done of course.

like image 87
noxdafox Avatar answered Sep 29 '22 06:09

noxdafox