Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calling async_result.get() from within a celery task

Tags:

python

celery

I have a celery task that calls another remote task (it's on a different celery app, in another server..). When I try to .get() the result of that remote task from within my task like this:

@app.task()
def my_local_task():
  result_from_remote = app.send_task('remote_task', [arg1, arg2])
  return result_from_remote.get()

I get this error:

RuntimeWarning: Never call result.get() within a task! See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being
raised instead of just being a warning.

  warnings.warn(RuntimeWarning(E_WOULDBLOCK))

Basically I want my task to be "synchronous" - I want it to wait for the result of the remote task, and i'm ok with it.

Can I tell celery it's ok? There is the solution of chaining of course, except it's impossible to chain remote tasks. The only way to call remote tasks is using app.send_task, which returns an AsyncResult, and that I can't chain as I need the task function itself..

like image 366
Jenian Avatar asked Oct 22 '15 11:10

Jenian


3 Answers

Here is a fragment that hushes the warning if you know what you are doing is safe:

from celery.result import allow_join_result

with allow_join_result():
    result.get()

source

like image 200
jptknta Avatar answered Oct 21 '22 01:10

jptknta


If you want your task to be synchronous, you can use ready() to hold a loop:

import time

while not result_from_remote.ready():
    time.sleep(5)

return result_from_remote.get()
like image 41
xbello Avatar answered Oct 21 '22 01:10

xbello


There is the solution of chaining of course, except it's impossible to chain remote tasks. The only way to call remote tasks is using app.send_task, which returns an AsyncResult, and that I can't chain as I need the task function itself..

No, it is possible to chain remote tasks. I've just tried it in a project of mine and it works. I suggest you try it with a trivial test task first to make sure you got the basics down before moving to something more complex. I've created these tasks:

@app.task
def foo(arg):
    return arg + 1

@app.task
def bar(arg):
    return "I barred " + str(arg)

The two tasks are held in a module named app.tasks. (It is part of a Django project.)

Then I wrote a command that does:

import celery
print (celery.signature("app.tasks.foo", args=(1, )) |
       celery.signature("app.tasks.bar")).delay().get()

And I got on the screen:

I barred 2
like image 25
Louis Avatar answered Oct 21 '22 02:10

Louis