Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Check if celery warm shutdown is in progress from a task

Tags:

python

celery

TL;DR

Is there a way to tell if our celery worker is going into a warm shutdown? In other words, can I check if there's a SIGTERM pending? I have a task that reschedules itself, but I'd like to avoid rescheduling myself if there's a shutdown pending to avoid holding up the warm shutdown. Something like this:

if not self.shutdown_pending():
    self.retry(countdown=5, max_retries=3)

Actually, rescheduling stuff aside, I'd like to be able to cleanly bail out of current work as soon as I get a SIGTERM so that I can restart my workers as quickly as possible on a new code deploy:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    for item in work:
        if self.shutdown_pending():
            logger.info("Shutdown detected. Bailing.")
            return
        item.process()

Background

I have a task that takes a variable amount of time (anywhere from a few seconds to a few minutes). I'm using a minutely celery-beat schedule to invoke the task initially, but if I get just a small amount of work that only takes, say, ten seconds to complete then I'd like to immediately re-invoke the task a few times in order to avoid waiting 50 seconds for the next celery beat to come in since new work will most likely become available during that time period.

All of this is to minimize the latency of my work items being handled. I want to avoid that 50 second period where the worker is sitting there doing nothing since some work might have become available during that time. Note that the work becomes "ready" based on an "expiration" of items in the database, which is why I'm using celery beat to just sweep things up as they become available and not triggering the task directly.

My task looks something like this:

@app.task(bind=True)
def my_work_task(self):
    work = get_work()
    do_some_work(work)
    # if this was just a short bit of work reschedule ourselves
    # immediately to avoid wasting time waiting for the
    # next celery beat.
    if len(work) < SMALL_WORK_THRESHOLD:
        self.retry(countdown=5, max_retries=3)

This all works fine except for one thing: when I reload my workers (by sending SIGTERM) I can end up waiting on a single worker rescheduling itself, possibly with massive slices of work each time. Each invocation could take several minutes, up until I hit my max_retries value. This makes deploying new code a problem since work processing nearly grinds to a halt for up to a few minutes.

like image 614
mgalgs Avatar asked Nov 09 '22 15:11

mgalgs


1 Answers

Sadly there isn't an easy solution like mentioned in this other similar question.

The only thing you can do is change approach or use a SIGKILL, in this case just make sure to use the task results back-end to understand which tasks you may have lost, or if you keep a status on the db you may not need it.

In my personal experience I always used a mongodb to register the star and the end of a task. This was allowing me to see tasks that never completed due to machine crashes (I was using CELERY_ACKS_LATE) and also perform global locking if I wanted a certain task to run only once at the time on the entire cloud. This way if SIGTERM wasn't working after a certain time I was sending a SIGKILL without risk of loosing work.

Hope this helps

like image 100
Mauro Rocco Avatar answered Nov 14 '22 21:11

Mauro Rocco