Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reporting yielded results of long-running Celery task

Tags:

Problem

I've segmented a long-running task into logical subtasks, so I can report the results of each subtask as it completes. However, I'm trying to report the results of a task that will effectively never complete (instead yielding values as it goes), and am struggling to do so with my existing solution.

Background

I'm building a web interface to some Python programs I've written. Users can submit jobs through web forms, then check back to see the job's progress.

Let's say I have two functions, each accessed via separate forms:

  • med_func: Takes ~1 minute to execute, results are passed off to render(), which produces additional data.
  • long_func: Returns a generator. Each yield takes on the order of 30 minutes, and should be reported to the user. There are so many yields, we can consider this iterator as infinite (terminating only when revoked).

Code, current implementation

With med_func, I report results as follows:

On form submission, I save an AsyncResult to a Django session:

    task_result = med_func.apply_async([form], link=render.s())     request.session["task_result"] = task_result 

The Django view for the results page accesses this AsyncResult. When a task has completed, results are saved into an object that is passed as context to a Django template.

def results(request):     """ Serve (possibly incomplete) results of a session's latest run. """     session = request.session      try:  # Load most recent task         task_result = session["task_result"]     except KeyError:  # Already cleared, or doesn't exist         if "results" not in session:             session["status"] = "No job submitted"     else:  # Extract data from Asynchronous Tasks         session["status"] = task_result.status         if task_result.ready():             session["results"] = task_result.get()             render_task = task_result.children[0]              # Decorate with rendering results             session["render_status"] = render_task.status             if render_task.ready():                 session["results"].render_output = render_task.get()                 del(request.session["task_result"])  # Don't need any more      return render_to_response('results.html', request.session) 

This solution only works when the function actually terminates. I can't chain together logical subtasks of long_func, because there are an unknown number of yields (each iteration of long_func's loop may not produce a result).

Question

Is there any sensible way to access yielded objects from an extremely long-running Celery task, so that they can be displayed before the generator is exhausted?

like image 351
David Cain Avatar asked Jun 11 '13 19:06

David Cain


1 Answers

In order for Celery to know what the current state of the task is, it sets some metadata in whatever result backend you have. You can piggy-back on that to store other kinds of metadata.

def yielder():     for i in range(2**100):         yield i  @task def report_progress():     for progress in yielder():         # set current progress on the task         report_progress.backend.mark_as_started(             report_progress.request.id,             progress=progress)  def view_function(request):     task_id = request.session['task_id']     task = AsyncResult(task_id)     progress = task.info['progress']     # do something with your current progress 

I wouldn't throw a ton of data in there, but it works well for tracking the progress of a long-running task.

like image 109
Paul Avatar answered Oct 19 '22 07:10

Paul