I'm new using celery and have a question. I have this simple task:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
and I am calling this task later in a test case like
result = tasks.test_default_queue.apply_async(queue="install")
The task run successfully in the queue install
(because I am seeing it in the celery log, and it completes fine. But I would like to know a programmatically way of finding in which queue was the task test_install_queue
run, from the object stored in result
.
Thank you!
EDIT:
I've changed the tasks to be like:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
and then I'm using the result of apply_async
as follows:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
and the workaround is that the worker (hostname) has the same name as the only queue that is initialized in the worker.
You can try the following approach:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
That approach is built on assumption that you use default celery settings and your exchanges are direct. If you need more universal solution for fanout and topic exchanges then you will have to check routing keys of every declared queue in app.amqp.queues
.
I've just faced this issue myself and I was really sceptic about the need of a convoluted solution such the one from "lexabug" that has been accepted... So, since even the Celery documentation does not provide an effective alternative, I've investigated by myself using reflection in order to understand which object contains the info I need and I came up with a super easy and straightforward solution. Specifically, I was writing an hook, or better, a signal in Celery terms, and this is how I retrieved the name of the queue based on the task name:
@signals.after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
# "sender" is a string containing task name
# ("celery" here is the celery app)
task: Task = celery.tasks.get(sender)
# once we have the task object, we can access the "queue" property
# which contains the name of the queue
# (it' a dynamic property so don't expect support by your IDE)
queue_name: str = task.queue if task is not None else 'unknown'
ps. I'm using Celery 4.4
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With