I have two separate celeryd processes running on my server, managed by supervisor
. They are set to listen on separate queues as such:
[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...
[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...
And my celeryconfig looks something like this:
from celery.schedules import crontab
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
'default': {
"exchange": "default",
"binding_key": "default",
},
'queue1': {
'exchange': 'queue1',
'routing_key': 'queue1',
},
'queue2': {
'exchange': 'queue2',
'routing_key': 'queue2',
},
}
CELERY_IMPORTS = ('tasks', )
CELERYBEAT_SCHEDULE = {
'first-queue': {
'task': 'tasks.sync',
'schedule': crontab(hour=02, minute=00),
'kwargs': {'client': 'client_1'},
'options': {'queue': 'queue1'},
},
'second-queue': {
'task': 'tasks.sync',
'schedule': crontab(hour=02, minute=00),
'kwargs': {'client': 'client_2'},
'options': {'queue': 'queue1'},
},
}
All tasks.sync
tasks must be routed to a specific queue (and therefore celeryd progress). But when I try to run the task manually with sync.apply_async(kwargs={'client': 'value'}, queue='queue1')
both celery workers pick up the task. How can I make the task route to the correct queue and only be run by the worker that is bound to the queue?
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
Celery is a distributed task queue written in Python, which works using distributed messages. Each execution unit in celery is called a task. A task can be executed concurrently on one or more servers using processes called workers.
You are only running one celerybeat instance right?
Maybe you have old queue bindings that clash with this?
Try running rabbitmqctl list_queues
and rabbitmqctl list_bindings
,
maybe reset the data in the broker to start from scratch.
The example you have here should work, and is working for me when I just tried it.
Tip: Since you are using the same exchange and binding_key value as the queue name,
you don't have to explicitly list them in CELERY_QUEUES. When CELERY_CREATE_MISSING_QUEUES
is on (which it is by default) the queues will be automatically created exactly like you have
if you just do celeryd -Q queue1
or send a task to a queue that is undefined.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With