Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to get the queue in which a task was run - celery

I'm new using celery and have a question. I have this simple task:

@app.task(name='test_install_queue')
def test_install_queue():
    return subprocess.call("exit 0",shell=True)

and I am calling this task later in a test case like

result = tasks.test_default_queue.apply_async(queue="install")

The task run successfully in the queue install (because I am seeing it in the celery log, and it completes fine. But I would like to know a programmatically way of finding in which queue was the task test_install_queue run, from the object stored in result.

Thank you!

EDIT:

I've changed the tasks to be like:

@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
    return self.request.__dict__

and then I'm using the result of apply_async as follows:

result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]

and the workaround is that the worker (hostname) has the same name as the only queue that is initialized in the worker.

like image 828
SnwBr Avatar asked Dec 08 '22 09:12

SnwBr


2 Answers

You can try the following approach:

delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
    if queue.exchange.name == delivery_info['exchange'] 
        and queue.routing_key == delivery_info['routing_key']:
            original_queue = queue.name
            break

That approach is built on assumption that you use default celery settings and your exchanges are direct. If you need more universal solution for fanout and topic exchanges then you will have to check routing keys of every declared queue in app.amqp.queues.

like image 139
lexabug Avatar answered Dec 11 '22 09:12

lexabug


I've just faced this issue myself and I was really sceptic about the need of a convoluted solution such the one from "lexabug" that has been accepted... So, since even the Celery documentation does not provide an effective alternative, I've investigated by myself using reflection in order to understand which object contains the info I need and I came up with a super easy and straightforward solution. Specifically, I was writing an hook, or better, a signal in Celery terms, and this is how I retrieved the name of the queue based on the task name:

    @signals.after_task_publish.connect()
    def on_task_publish(sender=None, headers=None, body=None, **kwargs):

        # "sender" is a string containing task name 
        # ("celery" here is the celery app)
        task: Task = celery.tasks.get(sender)

        # once we have the task object, we can access the "queue" property 
        # which contains the name of the queue 
        # (it' a dynamic property so don't expect support by your IDE)
        queue_name: str = task.queue if task is not None else 'unknown'

ps. I'm using Celery 4.4

like image 23
daveoncode Avatar answered Dec 11 '22 10:12

daveoncode