Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Notify celery task of worker shutdown

I am using celery 2.4.1 with python 2.6, the rabbitmq backend, and django. I would like my task to be able to clean up properly if the worker shuts down. As far as I am aware you cannot supply a task destructor so I tried hooking into the worker_shutdown signal.

Note: AbortableTask only works with the database backend so I cant use that.

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

However, the shutdown hook never gets called. Ctrl-C'ing the worker doesnt kill the task and I have to manually kill it from the shell.

So if this is not the proper way to go about it, how do I allow tasks to shutdown gracefully?

like image 357
dgorissen Avatar asked Nov 15 '11 15:11

dgorissen


People also ask

What happens when a celery task fails?

Celery will stop retrying after 7 failed attempts and raise an exception.

How does celery execute tasks?

Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.


1 Answers

worker_shutdown is only sent by the MainProcess, not the child pool workers. All worker_* signals except for worker_process_init, refer to the MainProcess.

However, the shutdown hook never gets called. Ctrl-C'ing the worker doesn't kill the task and I have to manually kill it from the shell.

The worker never terminates a task under normal (warm) shutdown. Even if a task takes days to complete, the worker won't complete shutdown until it's completed. You can set --soft-time-limit, or --time-limit to to tell the instance when it's ok to terminate the task.

So to add any kind of process cleanup process you first need to make sure that the tasks can actually complete. As the cleanup wouldn't be called before that happens.

To add a cleanup step to the pool worker processes you can use something like:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)
like image 114
asksol Avatar answered Oct 10 '22 06:10

asksol