Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to list the queued items in celery?

I have a Django project on an Ubuntu EC2 node, which I have been using to set up an asynchronous using Celery.

I am following http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/ along with the docs.

I've been able to get a basic task working at the command line, using:

(env1)ubuntu@ip-172-31-22-65:~/projects/tp$ celery --app=myproject.celery:app worker --loglevel=INFO

I just realized, that I have a bunch of tasks in my queue, that had not executed:

[2015-03-28 16:49:05,916: WARNING/MainProcess] Restoring 4 unacknowledged message(s).
(env1)ubuntu@ip-172-31-22-65:~/projects/tp$ celery -A tp purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 81 messages from 1 known task queue.

How do I get a list of the queued items from the command line?

like image 252
user1592380 Avatar asked Mar 28 '15 16:03

user1592380


People also ask

What is Celery in message queue?

Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time. Celery. Stable release. 5.2.3 / December 29, 2021.


2 Answers

If you want to get all scheduled tasks,

celery inspect scheduled

To find all active queues

celery inspect active_queues

For status

celery inspect stats

For all commands

celery inspect

If you want to get it explicitily.Since you are using redis as queue.Then

redis-cli

>KEYS * #find all keys

Then find out something related to celery

>LLEN KEY # i think it gives length of list
like image 192
itzMEonTV Avatar answered Sep 26 '22 20:09

itzMEonTV


Here is a copy-paste solution for Redis:

def get_celery_queue_len(queue_name):
    from yourproject.celery import app as celery_app
    with celery_app.pool.acquire(block=True) as conn:
        return conn.default_channel.client.llen(queue_name)


def get_celery_queue_items(queue_name):
    import base64
    import json
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)

    decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

It works with Django. Just don't forget to change yourproject.celery.

like image 25
Max Malysh Avatar answered Sep 23 '22 20:09

Max Malysh