Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery shutdown worker after particular task

Tags:

python

celery

I'm using celery (solo pool with concurrency=1) and I want to be able to shut down the worker after a particular task has run. A caveat is that I want to avoid any possibility of the worker picking up any further tasks after that one.

Here's my attempt in the outline:

from __future__ import absolute_import, unicode_literals from celery import Celery from celery.exceptions import WorkerShutdown from celery.signals import task_postrun  app = Celery() app.config_from_object('celeryconfig')  @app.task def add(x, y):     return x + y  @task_postrun.connect(sender=add) def shutdown(*args, **kwargs):     raise WorkerShutdown() 

However, when I run the worker

celery -A celeryapp  worker --concurrency=1 --pool=solo 

and run the task

add.delay(1,4) 

I get the following:

 -------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall) ---- **** -----  --- * ***  * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37 -- * - **** ---  - ** ---------- [config] - ** ---------- .> app:         __main__:0x7f596896ce90 - ** ---------- .> transport:   redis://localhost:6379/0 - ** ---------- .> results:     redis://localhost/ - *** --- * --- .> concurrency: 4 (solo) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** -----   -------------- [queues]                 .> celery           exchange=celery(direct) key=celery   [2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s) 

The task is re-queued and will be run again on another worker, leading to a loop.

This also happens when I move the WorkerShutdown exception within the task itself.

@app.task def add(x, y):     print(x + y)     raise WorkerShutdown() 

Is there a way I can shut down the worker after a particular task, while avoiding this unfortunate side-effect?

like image 830
samfrances Avatar asked Mar 18 '18 14:03

samfrances


People also ask

How do you stop a Celery worker?

In this setup, when running a celery worker with --pool solo option, kill -KILL <pid> stops worker immediately.

What is Soft_time_limit in Celery?

The time limit is set in two values, soft and hard . The soft time limit allows the task to catch an exception to clean up before it is killed: the hard timeout isn't catch-able and force terminates the task.

What is concurrency in Celery?

As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.

What is Apply_async in Celery?

This way, you delegate queue creation to Celery. You can use apply_async with any queue and Celery will handle it, provided your task is aware of the queue used by apply_async . If none is provided then the worker will listen only for the default queue.


2 Answers

The recommended process for shutting down a worker is to send the TERM signal. This will cause a celery worker to shutdown after completing any currently running tasks. If you send a QUIT signal to the worker's main process, the worker will shutdown immediately.

The celery docs, however, usually discuss this in terms of managing celery from a command line or via systemd/initd, but celery additionally provides a remote worker control API via celery.app.control.
You can revoke a task to prevent workers from executing the task. This should prevent the loop you are experiencing. Further, control supports shutdown of a worker in this manner as well.

So I imagine the following will get you the behavior you desire.

@app.task(bind=True) def shutdown(self):     app.control.revoke(self.id) # prevent this task from being executed again     app.control.shutdown() # send shutdown signal to all workers 

Since it's not currently possible to ack the task from within the task, then continue executing said task, this method of using revoke circumvents this problem so that, even if the task is queued again, the new worker will simply ignore it.

Alternatively, the following would also prevent a redelivered task from being executed a second time...

@app.task(bind=True) def some_task(self):     if self.request.delivery_info['redelivered']:         raise Ignore() # ignore if this task was redelivered     print('This should only execute on first receipt of task') 

Also worth noting AsyncResult also has a revoke method that calls self.app.control.revoke for you.

like image 133
sytech Avatar answered Sep 20 '22 13:09

sytech


If you shutdown the worker, after the task has completed, it won't re-queue again.

@task_postrun.connect(sender=add) def shutdown(*args, **kwargs):     app.control.broadcast('shutdown') 

This will gracefully shutdown the worker after tasks is completed.

[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0 [2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors [2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone [2018-04-01 18:44:15,742: INFO/MainProcess] celery@foo ready. [2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] [2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3    [2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote 

Note: broadcast will shutdown all workers. If you want to shutdonw a specific worker, start worker with a name

celery -A celeryapp  worker -n self_killing --concurrency=1 --pool=solo 

Now you can shutdown this with destination parameter.

app.control.broadcast('shutdown', destination=['celery@self_killing']) 
like image 30
Pandikunta Anand Reddy Avatar answered Sep 19 '22 13:09

Pandikunta Anand Reddy