Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery dynamic queue creation and routing

I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:

@task
def greet(name):
    return "Hello %s!" % name


def run():
    result = greet.delay(args=['marc'], queue='greet.1',
        routing_key='greet.1')
    print result.ready()

then I have a custom router:

class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'tasks.greet':
            return {'queue': kwargs['queue'],
                    'exchange': 'greet',
                    'exchange_type': 'direct',
                    'routing_key': kwargs['routing_key']}
        return None

this creates an exchange called greet.1 and a queue called greet.1 but the queue is empty. The exchange should be just called greet which knows how to route a routing key like greet.1 to the queue called greet.1.

Any ideas?

like image 247
Marconi Avatar asked Aug 09 '12 11:08

Marconi


People also ask

What is routing in celery?

By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks.

How do celery queues work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

How many tasks can celery handle?

celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.


1 Answers

When you do the following:

task.apply_async(queue='foo', routing_key='foobar')

Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)

So if 'foo' does not exist in CELERY_QUEUES you will end up with:

queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')

The producer will then declare that queue, but since you override the routing_key, actually send the message using routing_key = 'foobar'

This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.

It's harder to do what you want though, you can create the queue yourself and declare it, but that won't work well with automatic message publish retries. It would be better if the queue argument to apply_async could support a custom kombu.Queue instead that will be both declared and used as the destination. Maybe you could open an issue for that at http://github.com/celery/celery/issues

like image 107
asksol Avatar answered Sep 26 '22 21:09

asksol