I have millions of tasks reserved in Celery (ETA not due yet) and every time I want to update my Celery code base, I have to restart it, which cuts the connection to RabbitMQ and causes RabbitMQ to redistribute tasks again (I am using late ack).
Is it possible to reload new code base but still keep my reserved tasks? I am using Celery with Django.
Short answer: yes, you can, but you have to write your own queue draining logic.
Longer answer: when you want to do a code update (and depending on how you handle this), you have to use the celery remote control api to tell all your workers to stop consuming tasks. RabbitMQ brokers support the remote control api, so you're in luck.
from my_app.celery import app
inspector = app.control.inspect()
controller = app.control
# get a list of current workers
workers = inspector.ping()
active_queues = inspector.active_queues()
all_queues = set()
for worker, queues in active_queues.items():
for queue in queues:
all_queues.add(queue['name'])
for queue in all_queues:
controller.cancel_consumer(queue)
This will stop your workers from consuming tasks. Now you have to monitor your workers until they have finished processing any active tasks.
import time
done = False
while not done:
active_count = 0
active = inspector.active()
active_count = sum(map(lambda l: len(l), active.values()))
done = active_count > 0
if not done:
time.sleep(60) # wait a minute between checks
Once your workers are done, your are clear to deploy your code without having to worry about losing tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With