I have around 10,000 scheduled tasks on my current celery setup. I didn't realize what scheduled tasks were and decided to use them to send follow-up emails months in advance.
Looking back, it's probably never a good idea to schedule a task for more than 1 hour in the future as every time you restart a worker it has to re-receive every scheduled task from rabbitMQ and then they all just sit in the memory.
My problem is that if I have to revoke a task, it doesn't just delete it. The task stays in memory but a revoke queue now contains the ID of the task. When it is up for execution, celery checks to see if it is revoked and if it is, it will revoke it at this point.
However, the task will still stay in memory up until then, and if I restart my worker at anytime, the revoke queue will be cleared as I didn't make it persistent.
How do I permanently remove a task from my celery worker? I essentially just need to send an acknowledged back to rabbitMQ so rabbit removes it for once and for all and if I restart celery it won't come back.
I've looked in the docs and source code and tried to do it myself in the shell but I can't figure out the proper place for acking a task to rabbitMQ and then popping it forever.
1. To properly purge the queue of waiting tasks you have MUST to stop all the workers (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):
$ sudo rabbitmqctl stop
or (in case RabbitMQ/message broker is managed by Supervisor):
$ sudo supervisorctl stop all
2. ...and then purge the tasks from a specific queue:
$ cd <source_dir>
$ celery amqp queue.purge <queue name>
3. Start RabbitMQ:
$ sudo rabbitmqctl start
or (in case RabbitMQ is managed by Supervisor):
$ sudo supervisorctl start all
If you have only been using one queue or one task, this is easy:
From the docs:
Answer: You can use the celery purge command to purge all configured task queues:
$ celery -A proj purge
or programatically:
>>> from proj.celery import app
>>> app.control.purge()
1753
If you only want to purge messages from a specific queue you have to use the AMQP API or the celery amqp utility:
$ celery -A proj amqp queue.purge <queue name>
The number 1753 is the number of messages deleted.
You can also start worker with the --purge
argument, to purge messages when the worker starts.
Update: If you have multiple queues or tasks
I don't know of any way to edit them in RabbitMQ since the server is not built to access/edit/delete queued tasks that way, but you could always disable your task in the code:
@task
def my_old_task()
pass
This way all the tasks will run as scheduled but won't perform anything; since they are neither renamed nor deleted, you won't have any error.
Obviously you should update your code to stop scheduling these tasks. After a while, there won't be anymore tasks of this type scheduled so you can delete the code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With