Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Result error "args must be a list or tuple"

I am running a Django website and have just gotten Celery to run, but I am getting confusing errors. Here is how the code is structured.

In tests.py:

from tasks import *
from celery.result import AsyncResult

project = Project.objects.create()
# initalize various sub-objects of the project

c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done  
while not r and not f:
    r = AsyncResult(c.id).ready()
    f = AsyncResult(c.id).failed()

self.assertEqual() #will fail because task fails

in tasks.py:

from __future__ import absolute_import
from celery import shared_task

@shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)

    for part in project.part_set.all():
        partFunction(part.id)
        p = Part.objects.get(id=part.id)
        # add to various variables in project from variables in p
    project.save()

in mainapp/settings.py:

BROKER_URL = "amqp://ipaddress"
CELERY_RESULT_BACKEND='amqp'
CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml']
CELERY_IGNORE_RESULT = False

the celery debug console log for must by list/tuple:

[INFO/MainProcess] Received task: myapp.tasks.function[id]
[ERROR/MainProcess] Task myapp.tasks.function[id]
    raised unexpected: ValueError('task args must be a list or tuple',)
Traceback:
   File "/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
       R = retval = fun(*args, **kwargs)
   File "/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
       return self.run(*args, **kwargs)
   File "/myapp/tasks.py", line 28, in function
       p = Part.objects.get(id=part.id)
   File "/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
       **dict(self._get_exec_options(), **options)
   File "/python2.7/site-packages/celery/app/base.py", line 351, in send_task
       reply_to=reply_to or self.oid, **options
   File "celery/app/amqp.py", line 252, in publish_task
       raise ValueError('task args must be a list or tuple')
ValueError: task args must be a list or tuple

the error I am getting is as above, AsyncResult(c.id).result: task args must be a list or tuple. This should be an easy solution but it is not. When I make it a list like so:

inline = [project.id]
c = function.delay(inline)

It then changes it mind and tells me that AsyncResult(c.id).result: int() argument must be a string or a number, not 'list'

As you can imagine I am very confused as to what might be the problem.


Edit

tasks.py

@shared_task
def function(app):
    @app.task(name='myapp.function', bind=True)
    def function(project_id):

tests.py

c = function.s(project.id).delay()

function.app prints

like image 757
DoctorWizard Avatar asked Jun 17 '14 19:06

DoctorWizard


1 Answers

You are getting an error in your code inside the task, it shows in traceback:

File "/myapp/tasks.py", line 28, in function
   p = Part.objects.get(id=part.id)

Your code seems right, but from the traceback it looks like celery has an old version of task pickled. Very important that you restart celery whenever you change anything inside task.py (maybe even if you change other files, but I don't think so). It could be a cause of your problem, it bit me in a butt couple of times.

Also there is no reason to pull the part from database individually p = Part.objects.get(id=part.id) since you already getting current part instance when you are iterating over a queryset in for part in project.part_set.all():. This one is just a suggestion, you might have more code that needs that step.

As a side note, if it is a task in your project and not a part of some reusable app, just use a regular @task decorator, celery will find it, but make sure you configure Celery app right, here is another one of my posts that can guide you through: Celery / Django Single Tasks being run multiple times

So as long as you have everything configured correctly, just use it as you did before:

@task #or @shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)
    ....

Then call it:

result = function.delay(project.id)

or:

result = function.apply_async(args=(project.id,))

Obviously I'd also recommend testing the task directly by calling it without celery function(project.id), but I'm sure you knew that.

like image 121
lehins Avatar answered Nov 06 '22 07:11

lehins