Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Receiving events from celery task

Tags:

django

celery

I have a long running celery task which iterates over an array of items and performs some actions.

The task should somehow report back which item is it currently processing so end-user is aware of the task's progress.

At the moment my django app and celery seat together on one server, so I am able to use Django's models to report the status, but I am planning to add more workers which are away from Django, so they can't reach DB.

Right now I see few solutions:

  • Store intermediate results manually using some storage, like redis or mongodb making then available over the network. This worries me a little bit because if for example I will use redis then I should keep in sync the code on a Django side reading the status and Celery task writing the status, so they use the same keys.
  • Report status to the Django back from celery using REST calls. Like PUT http://django.com/api/task/123/items_processed
  • Maybe use Celery event system and create events like Item processed on which django updates the counter
  • Create a seperate worker which runs on a server with django which holds a task which only increases items proceeded count, so when the task is done with an item it issues increase_messages_proceeded_count.delay(task_id).

Are there any solution or hidden problems with the ones I mentioned?

like image 442
Glueon Avatar asked Sep 15 '15 10:09

Glueon


2 Answers

There are probably many ways to achieve your goal, but here is how I would do it.

Inside your long running celery task set the progress using django's caching framework:

from django.core.cache import cache

@app.task()
def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...

Then all you have to do is make a recurring AJAX GET request with that key and retrieve the progress from cache. Something along those lines:

 def task_progress_view(request, *args, **kwargs):
     key = request.GET.get('task_key')
     progress = cache.get(key)
     return HttpResponse(content=json.dumps({'progress': progress}),
                         content_type="application/json; charset=utf-8")

Here is a caveat though, if you are running your server as multiple processes, make sure that you are using something like memcached, because django's native caching will be inconsistent among the processes. Also I probably wouldn't use celery's task_id as a key, but it is sufficient for demonstration purpose.

like image 72
lehins Avatar answered Nov 15 '22 09:11

lehins


Take a look at flower - a real-time monitor and web admin for Celery distributed task queue:

  • https://github.com/mher/flower#api
  • http://flower.readthedocs.org/en/latest/api.html#get--api-tasks

You need it for presentation, right? Flower works with websockets.

For instance - receive task completion events in real-time (taken from official docs):

var ws = new WebSocket('ws://localhost:5555/api/task/events/task-succeeded/');
ws.onmessage = function (event) {
    console.log(event.data);
}

You would likely need to work with tasks ('ws://localhost:5555/api/tasks/').

I hope this helps.

like image 30
Artur Barseghyan Avatar answered Nov 15 '22 09:11

Artur Barseghyan