I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:
@task
def greet(name):
return "Hello %s!" % name
def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()
then I have a custom router:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None
this creates an exchange called greet.1
and a queue called greet.1
but the queue is empty. The exchange should be just called greet
which knows how to route a routing key like greet.1
to the queue called greet.1
.
Any ideas?
By default, Celery routes all tasks to a single queue and all workers consume this default queue. With Celery queues, you can control which Celery workers process which tasks. This can be useful if you have a slow and a fast task and you want the slow tasks not to interfere with the fast tasks.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
celery beats only trigger those 1000 tasks (by the crontab schedule), not run them. If you want to run 1000 tasks in parallel, you should have enough celery workers available to run those tasks.
When you do the following:
task.apply_async(queue='foo', routing_key='foobar')
Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)
So if 'foo' does not exist in CELERY_QUEUES you will end up with:
queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
The producer will then declare that queue, but since you override the routing_key,
actually send the message using routing_key = 'foobar'
This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.
It's harder to do what you want though, you can create the queue yourself
and declare it, but that won't work well with automatic message publish retries.
It would be better if the queue argument to apply_async could support
a custom kombu.Queue
instead that will be both declared and used as the destination.
Maybe you could open an issue for that at http://github.com/celery/celery/issues
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With