Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Some Celery tasks work, others are NotRegistered

I follow the Celery Django tutorial and the tasks I see in the example (add, mul) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get().

But I get *** NotRegistered: u'pipeline.tasks.sayhello' when I try to execute another my task res = sayhello.delay('trex').

If I do res = sayhello('trex') then I can get the result by just typing res. But in this way, I execute the function ornidarly, without using Celery.

The task works only if I run it in the Django shell ./manage shell

>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'

So, the problem is that I can't execute sayhello task from pipeline/views.py. But I can execute tasks add and mul from there.

Why is that? How to run tasks correctly from views.py?

The error full message:

[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
  File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
    strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'

Django version

1.9.7

Celery version:

celery==4.0.0
django-celery==3.1.17

Django project dir tree:

rocket
├── etl
│   ├── etl
│   │   ├── celery.py
│   │   ├── __init__.py
│   │   ├── settings
│   │   │   ├── base.py
│   │   │   ├── dev.py
│   │   │   ├── __init__.py
│   │   │   ├── production.py
│   │   │   └── test.py
│   │   ├── urls.py
│   │   ├── wsgi.py
│   ├── manage.py
│   ├── pipeline
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   ├── views.py

etl/pipeline/views.py

from .tasks import *

def get_response(request):
    result = add.delay(1, 2)
    result.get()
    result = sayhello.delay('tiger')
    result.get()

etl/pipeline/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def sayhello(name):
    return "Hello %s" % name

Also I tried this:

from celery.decorators import task

@task(name="sayhello")
def sayhello(name):
    return "Hello {0}".format(name)

etl/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

etl/__init__py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

etl/settings/base.py

...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
like image 813
srgbnd Avatar asked Nov 11 '16 11:11

srgbnd


People also ask

Can a celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".

How does celery task work?

When a task is ready to be run, Celery puts it on a queue, a list of tasks that are ready to be run. You can have many queues, but we'll assume a single queue here for simplicity. Putting a task on a queue just adds it to a to-do list, so to speak.

How does celery task queue work?

Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.


2 Answers

This might hopefully help someone. I had modified my code and neglected to restart the celery worker.

Try restarting the celery workers.

like image 51
Karuhanga Avatar answered Oct 16 '22 20:10

Karuhanga


The error is because CELERY_IMPORTS setting is not working properly into your etl/settings/base.py file. So my suggestion is :

Remove the comma from

CELERY_IMPORTS = ('pipeline.tasks' , )

And if the issue still persists,then run this command :

celery -A pipeline.tasks worker --loglevel=DEBUG

One more thing, your tasks.py file needs to be in a Django app (that's registered in settings.py) in order to be imported.So check this point also.Thanks.

like image 6
Prakhar Trivedi Avatar answered Oct 16 '22 18:10

Prakhar Trivedi