Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery trying shutdown worker by raising SystemExit in task_postrun signal but always hangs and the main process never exits

I'm trying to shutdown the main celery process by raisin SystemExit() in the task_postrun signal. The signal gets fired just fine, and the exception gets raised, but the worker never completely exits and just hangs there.

HOW DO I MAKE THIS WORK?

Am I forgetting some setting somewhere?

Below is the code that I'm using for the worker (worker.py):

from celery import Celery
from celery import signals

app = Celery('tasks',
    set_as_current = True,
    broker='amqp://guest@localhost//',
    backend="mongodb://localhost//",
)

app.config_from_object({
    "CELERYD_MAX_TASKS_PER_CHILD": 1,
    "CELERYD_POOL": "solo",
    "CELERY_SEND_EVENTS": True,
    "CELERYD_CONCURRENCY": 1,
    "CELERYD_PREFETCH_MULTIPLIER": 1,
})

def shutdown_worker(**kwargs):
    print("SHUTTING DOWN WORKER (RAISING SystemExit)")
    raise SystemExit()

import tasks

signals.task_postrun.connect(shutdown_worker)

print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")

and below is the code for tasks.py:

from celery import Celery,task
from celery.signals import task_postrun
import time

from celery.task import Task

class Blah(Task):
    track_started = True
    acks_late = False

    def run(config, kwargs):
        time.sleep(5)
        return "SUCCESS FROM BLAH"

def get_app():
    celery = Celery('tasks',
        broker='amqp://guest@localhost//',
        backend="mongodb://localhost//"
    )
    return celery

To test this, first thing I do is run the worker code (python worker.py), then I queue up a task like so:

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))

The output I see from the worker is this:

STARTING WORKER

 -------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)

I was expecting the python code to return from the call to app.worker_main() and then to print WORKER EXITED and then for the process to exit completely. This never happens, and the worker process has to be kill -KILL {PID}ed for it to go away (it's not able to consume any other tasks either.

I guess my main question would be:

HOW DO I MAKE THE CODE RETURN FROM app.worker_main()?

I'd like to be able to completely restart the worker process (by having the process COMPLETELY exit) after X number of tasks have been executed.

UPDATE I figured out what the worker is hanging on - the worker (WorkController) is hanging on a call to self.stop after it catches the SystemExit exception.

like image 714
Darryl Lickt Avatar asked Nov 06 '12 22:11

Darryl Lickt


1 Answers

Hate it when I end up answering my own question.

Anywhoo, it was blocking on a join call on the Mediator component inside the WorkController (calls stop() on the Mediator component, inside stop, it joins).

I got rid of the Mediator component by disabling all rate limits (should be this by default, but it's not for some reason).

You can disable all rate limits with the setting:

CELERY_DISABLE_ALL_RATE_LIMITS: True

Hope this helps somebody else down the road too.

Peace

like image 85
Darryl Lickt Avatar answered Oct 11 '22 15:10

Darryl Lickt