Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Route celery task to specific queue

I have two separate celeryd processes running on my server, managed by supervisor. They are set to listen on separate queues as such:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

And my celeryconfig looks something like this:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

All tasks.sync tasks must be routed to a specific queue (and therefore celeryd progress). But when I try to run the task manually with sync.apply_async(kwargs={'client': 'value'}, queue='queue1') both celery workers pick up the task. How can I make the task route to the correct queue and only be run by the worker that is bound to the queue?

like image 352
jmagnusson Avatar asked Apr 09 '12 21:04

jmagnusson


People also ask

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Is Celery a task queue?

Celery is a distributed task queue written in Python, which works using distributed messages. Each execution unit in celery is called a task. A task can be executed concurrently on one or more servers using processes called workers.


1 Answers

You are only running one celerybeat instance right?

Maybe you have old queue bindings that clash with this? Try running rabbitmqctl list_queues and rabbitmqctl list_bindings, maybe reset the data in the broker to start from scratch.

The example you have here should work, and is working for me when I just tried it.

Tip: Since you are using the same exchange and binding_key value as the queue name, you don't have to explicitly list them in CELERY_QUEUES. When CELERY_CREATE_MISSING_QUEUES is on (which it is by default) the queues will be automatically created exactly like you have if you just do celeryd -Q queue1 or send a task to a queue that is undefined.

like image 189
asksol Avatar answered Oct 25 '22 04:10

asksol