I need to route all tasks of a certain django site instance to a certain queue. My setup is as following:
DJANGO_SETTINGS_MODULE
, with a different CELERY_DEFAULT_QUEUE
setting.On the "celery server", I run multiple worker instances through supervisor (simplified conf):
[program:production_queue]
environment=PYTHONPATH=/pth/to/src/:/pth/to/site-packages/,DJANGO_SETTINGS_MODULE=website.settings.production
command=/pth/to/python celery -A website.celery worker --events --queues myserver --loglevel WARNING --concurrency 4 -n [email protected]
[program:staging_queue]
environment=PYTHONPATH=/pth/to/src/:/pth/to/site-packages/,DJANGO_SETTINGS_MODULE=website.settings.staging
command=/pth/to/python celery -A website.celery worker --events --queues myserver_staging --loglevel WARNING --concurrency 1 -n [email protected]
[program:development_queue]
environment=PYTHONPATH=/pth/to/src/:/pth/to/site-packages/,DJANGO_SETTINGS_MODULE=website.settings.development
command=/pth/to/python celery -A website.celery worker --events --queues myserver_development --loglevel INFO --concurrency 1 -n [email protected]
This works, with inspection:
$ celery -A website.celery inspect activeues
-> [email protected]: OK
* {u'exclusive': False, u'name': u'myserver', u'exchange': {u'name': u'celery', u'durable': True, u'delivery_mode': 2, u'passive': False, u'arguments': None, u'type': u'direct', u'auto_delete': False}, u'durable': True, u'routing_key': u'celery', u'no_ack': False, u'alias': None, u'queue_arguments': None, u'binding_arguments': None, u'bindings': [], u'auto_delete': False}
-> [email protected]: OK
* {u'exclusive': False, u'name': u'myserver_staging', u'exchange': {u'name': u'celery', u'durable': True, u'delivery_mode': 2, u'passive': False, u'arguments': None, u'type': u'direct', u'auto_delete': False}, u'durable': True, u'routing_key': u'celery', u'no_ack': False, u'alias': None, u'queue_arguments': None, u'binding_arguments': None, u'bindings': [], u'auto_delete': False}
-> [email protected]: OK
* {u'exclusive': False, u'name': u'myserver_development', u'exchange': {u'name': u'celery', u'durable': True, u'delivery_mode': 2, u'passive': False, u'arguments': None, u'type': u'direct', u'auto_delete': False}, u'durable': True, u'routing_key': u'celery', u'no_ack': False, u'alias': None, u'queue_arguments': None, u'binding_arguments': None, u'bindings': [], u'auto_delete': False}
(Names accord with the CELERY_DEFAULT_QUEUE settings)
website/celery.py
contains the basics (imports skipped):
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
I would therefore expect tasks generated by a webserver running with the development settings, to end up only in the development_queue
, and so on. However, I see tasks being processed by a different queue, or by all three, which is problematic.
Are my expectations wrong in that this would be a good way to separate these tasks? All documentation on routing is about routing different tasks to different queues, which I don't need. I need to route all tasks of a certain site (environment) to a certain queue. What can I do to separate these environments?
I got an answer from the developer of Celery here: https://github.com/celery/celery/issues/2508, which is:
You have to set all of
CELERY_DEFAULT_QUEUE
,CELERY_DEFAULT_EXCHANGE
andCELERY_DEFAULT_ROUTING_KEY
. Otherwise you will end up with three queues all bound to the same exchange and routing key.Or use the method here which will set it up explicitly: http://docs.celeryproject.org/en/latest/userguide/routing.html#changing-the-name-of-the-default-queue
That works!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With