Logo Questions Linux Laravel Mysql Ubuntu Git Menu

In celery how to get the task status for all the tasks for specific task name?

In celery i want to get the task status for all the tasks for specific task name. For that tried below code.

import celery.events.state

# Celery status instance.
stat = celery.events.state.State()

# task_by_type will return list of tasks.
query = stat.tasks_by_type("my_task_name")

# Print tasks.
print query

Now i'm getting empty list in this code.

like image 740
Hitul Mistry Avatar asked Jul 22 '13 09:07

Hitul Mistry

People also ask

How do you track celery tasks?

Celery needs a backend to store the state of your task if you want to track it. There are two main operation models for the result backend: RPC (like RabbitMQ/QPid) or a database. Both have its pros and cons and you should check the documentation to get the right one for your application.

How do you monitor a Celery worker?

Now, open yet another terminal and enter command cd examples/queue-based-distribution/ and then enter celery -A celery-task-queue status as the basic monitoring command is status , which returns the state of the workers. The result of celery -A celery-task-queue status will be the status and the number of nodes online.

What is Shared_task in Celery?

The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.

How do you pass arguments to Celery task?

To pass arguments to task with apply_async() you need to wrap them in a list and then pass the list as first argument, I.e. apply_async([arg1, arg2, arg3]) . See the documentation for more details and examples. Use delay() as an alternative.

1 Answers

celery.events.state.State() is a data-structure used to keep track of the state of celery workers and tasks. When calling State(), you get an empty state object with no data.

You should use app.events.Receiver(Stream Processing) or celery.events.snapshot(Batch Processing) to capture state that contains tasks.

Sample Code:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
like image 144
Leonardo.Z Avatar answered Nov 15 '22 15:11
