Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Detect whether Celery is Available/Running

I'm using Celery to manage asynchronous tasks. Occasionally, however, the celery process goes down which causes none of the tasks to get executed. I would like to be able to check the status of celery and make sure everything is working fine, and if I detect any problems display an error message to the user. From the Celery Worker documentation it looks like I might be able to use ping or inspect for this, but ping feels hacky and it's not clear exactly how inspect is meant to be used (if inspect().registered() is empty?).

Any guidance on this would be appreciated. Basically what I'm looking for is a method like so:

def celery_is_alive():     from celery.task.control import inspect     return bool(inspect().registered()) # is this right?? 

EDIT: It doesn't even look like registered() is available on celery 2.3.3 (even though the 2.1 docs list it). Maybe ping is the right answer.

EDIT: Ping also doesn't appear to do what I thought it would do, so still not sure the answer here.

like image 933
Cory Avatar asked Dec 14 '11 15:12

Cory


People also ask

How do I check my Celery queue?

Just to spell things out, the DATABASE_NUMBER used by default is 0 , and the QUEUE_NAME is celery , so redis-cli -n 0 llen celery will return the number of queued messages.


2 Answers

Here's the code I've been using. celery.task.control.Inspect.stats() returns a dict containing lots of details about the currently available workers, None if there are no workers running, or raises an IOError if it can't connect to the message broker. I'm using RabbitMQ - it's possible that other messaging systems might behave slightly differently. This worked in Celery 2.3.x and 2.4.x; I'm not sure how far back it goes.

def get_celery_worker_status():     ERROR_KEY = "ERROR"     try:         from celery.task.control import inspect         insp = inspect()         d = insp.stats()         if not d:             d = { ERROR_KEY: 'No running Celery workers were found.' }     except IOError as e:         from errno import errorcode         msg = "Error connecting to the backend: " + str(e)         if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':             msg += ' Check that the RabbitMQ server is running.'         d = { ERROR_KEY: msg }     except ImportError as e:         d = { ERROR_KEY: str(e)}     return d 
like image 169
Chris Chamberlin Avatar answered Oct 05 '22 14:10

Chris Chamberlin


From the documentation of celery 4.2:

from your_celery_app import app   def get_celery_worker_status():     i = app.control.inspect()     availability = i.ping()     stats = i.stats()     registered_tasks = i.registered()     active_tasks = i.active()     scheduled_tasks = i.scheduled()     result = {         'availability': availability,         'stats': stats,         'registered_tasks': registered_tasks,         'active_tasks': active_tasks,         'scheduled_tasks': scheduled_tasks     }     return result 

of course you could/should improve the code with error handling...

like image 33
Ouss Avatar answered Oct 05 '22 16:10

Ouss