Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery - call function on task done

I'm using celery with django and rabbitmq to create a message queue. I also have a worker, which is originating from a different machine. In a django view I'm starting a process like this:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")

My task is configured like this:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out

My question now is how can a broker (my django project) now receive the output from the "ls -l" command that the worker executed on his computer. I guess the best thing would be for worker to call a function in broker whenever it's ready to send the output from the executed command.

I would like to receive the output from worker asynchronously, then update the webpage with the output, but that's for another time. For now I would only like to receive the output from worker.

Update

Right now I've added a HTTP GET request that is triggered at the end of task notifying the web application that the task is done - I'm also sending the task_id in the http GET. The http GET method calls django view, which creates AsyncResult and gets the result, but the problem is that when calling result.get() I get the following error:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"

Any ideas why? I'm not using database, because I'm using rabbitmq with AMQP.

Update.

I would very much like to use third option, which seems like the best option - for small and big return values. My whole task looks like this:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r

So I've overridden the after_return function, which should also release the lock on my task, since the task's run() function already returned a value. In the HEAD request I'm basically calling a django function, which calls AsyncResult on task_id, which should provide with the result of the task. I've used arbitrary result for testing purposes in my case, since it's only for testing.

I would like to know why the above code doesn't work. I can use on_success, but I don't think it will make a difference - or will it?

like image 595
eleanor Avatar asked Mar 06 '12 00:03

eleanor


1 Answers

If you look here you will find the following:

Django-celery uses MySQL to keep track of all tasks/results, rabbit-mq is used as a communication bus basically.

What really is happening is that you are trying to fetch the ASyncResult of the worker while the task is still running (the task invoked an HTTP request to your server and since it didn't return yet, the db locking session from the worker is still active and the result row is still locked). When Django tries to read the task result (its state and the actual return value of the run function) it finds the row locked and issues you a warning.

There are a few ways to go about resolving this:

  1. Set up another celery task to reap the result and chain it to your processing task. That way original task will finish, release the lock on db and the new one will acquire it, read the result in django and do whatever you need it to do. Look up celery docs on this.

  2. Don't bother at all, and simply do a POST to Django with full processing result attached as a payload, rather than trying to fetch it via db.

  3. Override on_success in your task class and POST your notification request to Django then at which point the lock should be released on the db table.

Notice that you need to store the whole processing result (no matter how big it is) in the return of the run method (possibly pickled). You didn't mention how big the result can be so it might make sense to actually just do scenario #2 above (which is what I would do). Alternatively I would go with #3. Also don't forget to handle on_failure method as well in your task.

like image 131
enticedwanderer Avatar answered Sep 28 '22 03:09

enticedwanderer