Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to correctly shut down Python RQ worker processes dynamically?

Using Python RQ, we are trying to dynamically manage worker processes. We use a custom-built worker script, which (in simplified form) is as follows:

from rq import Connection, Worker

queues_to_listen_on = get_queues_to_listen_on()

with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()

We are particularly interested in the shutdown of workers. The main concern we have is how to do a graceful shutdown of a worker, in a way that will enable current work to be finished before shutting down. The request_stop(...) signal handler on the appropriate Worker object seems to do what we need, but there seems to be no way (at least to my knowledge) of emitting it unless it is through pressing CTRL+C on the worker process running in the terminal.

As I see it, there are two probable solutions (there could definitely be more) - in order of preference:

  1. Programmatically, using the rq library, send the signal to request_stop and thus trigger graceful shutdown.
  2. Somehow acquire the pid of the correct process (not sure if the workhorse process or the worker listener process) and using some other method, send the appropriate signal to that process. We have some ways this could be done, but it would very probably require more work and introduce other variables to the problem that I would prefer be left out (for example, using Fabric to run a remote command or something along those lines).

If there is a better way to solve this problem or a different alternative that would achieve the same goal, I would appreciate your suggestions.

like image 657
Juan Carlos Coto Avatar asked Feb 27 '13 18:02

Juan Carlos Coto


1 Answers

Option 1 is definitely better in terms of design.

However to solve your particular issue of having to use CTRL + C to exit the process (I hate that too), you can use the following strategy for your workers:

# WORKER_NAME.py
import os

PID = os.getpid()

@atexit.register
def clean_shut():
    print "Clean shut performed"

    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass

# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()

    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working

And in your master script, get the worker PID as @Borys suggested, send warm stop request, and os.unlink("path/to/WORKER_NAME.%d" % worker_PID) to secure the graceful shutdown :)

This would only apply to workers running indefinite loop though. If the worker process calls things that block even the plain sequential one-time job, you have to trace further to the possibly blocking routine to resolve from there, such as apply some sort of time-out strategy.

like image 167
woozyking Avatar answered Oct 23 '22 12:10

woozyking