I follow the Celery Django tutorial and the tasks I see in the example (add, mul
) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get()
.
But I get *** NotRegistered: u'pipeline.tasks.sayhello'
when I try to execute another my task res = sayhello.delay('trex')
.
If I do res = sayhello('trex')
then I can get the result by just typing res
. But in this way, I execute the function ornidarly, without using Celery.
The task works only if I run it in the Django shell ./manage shell
>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'
So, the problem is that I can't execute sayhello
task from pipeline/views.py
. But I can execute tasks add
and mul
from there.
Why is that? How to run tasks correctly from views.py
?
The error full message:
[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'
Django version
1.9.7
Celery version:
celery==4.0.0
django-celery==3.1.17
Django project dir tree:
rocket
├── etl
│ ├── etl
│ │ ├── celery.py
│ │ ├── __init__.py
│ │ ├── settings
│ │ │ ├── base.py
│ │ │ ├── dev.py
│ │ │ ├── __init__.py
│ │ │ ├── production.py
│ │ │ └── test.py
│ │ ├── urls.py
│ │ ├── wsgi.py
│ ├── manage.py
│ ├── pipeline
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── tasks.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ ├── views.py
etl/pipeline/views.py
from .tasks import *
def get_response(request):
result = add.delay(1, 2)
result.get()
result = sayhello.delay('tiger')
result.get()
etl/pipeline/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
@shared_task
def sayhello(name):
return "Hello %s" % name
Also I tried this:
from celery.decorators import task
@task(name="sayhello")
def sayhello(name):
return "Hello {0}".format(name)
etl/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
etl/__init__py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
etl/settings/base.py
...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".
When a task is ready to be run, Celery puts it on a queue, a list of tasks that are ready to be run. You can have many queues, but we'll assume a single queue here for simplicity. Putting a task on a queue just adds it to a to-do list, so to speak.
Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
This might hopefully help someone. I had modified my code and neglected to restart the celery worker.
Try restarting the celery workers
.
The error is because CELERY_IMPORTS setting is not working properly into your etl/settings/base.py file. So my suggestion is :
Remove the comma from
CELERY_IMPORTS = ('pipeline.tasks' , )
And if the issue still persists,then run this command :
celery -A pipeline.tasks worker --loglevel=DEBUG
One more thing, your tasks.py file needs to be in a Django app (that's registered in settings.py) in order to be imported.So check this point also.Thanks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With