I'm new to celery and was trying to use it in my app. Below is my basic app structure
my_app
|-run.py
|-app
|-mod1
|-mod2
|-tasks
|-__init__.py
|-email
|-other_tasks_file
I want to confine all my background tasks to my tasks module . In the init.py of tasks i have
from celery import Celery
celery = Celery('my_app', broker='redis://localhost:6379/0')
within my tasks/email i have
from app.tasks import celery
@celery.task
def send_email():
#do stuff
from the terminal i start a worker using
celery -A app.tasks worker --loglevel=DEBUG
But my task does not show up in celery's task list . Also, once i run my task from the interpreter like so
>>from app.tasks import email
>>email_result = email.send_email.delay()
When i do this i get the following response in my celery terminal
Received unregistered task of type 'app.tasks.emails.send_email'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see url for more information.
The full contents of the message body was:
{'kwargs': {}, 'taskset': None, 'id': '51e8f766-e772-4d85-bad0-5a6774ea541a', 'eta': None, 'timelimit': (None, None), 'args': [], 'retries': 0, 'task': 'app.tasks.emails.send_email', 'utc': True, 'errbacks': None, 'chord': None, 'expires': None, 'callbacks': None} (283b)
Traceback (most recent call last):
File "/usr/local/lib/python3.4/site-packages/celery/app/utils.py", line 235, in find_app
sym = symbol_by_name(app, imp=imp)
File "/usr/local/lib/python3.4/site-packages/celery/bin/base.py", line 492, in symbol_by_name
return symbol_by_name(name, imp=imp)
File "/usr/local/lib/python3.4/site-packages/kombu/utils/__init__.py", line 101, in symbol_by_name
return getattr(module, cls_name) if cls_name else module
AttributeError: 'module' object has no attribute 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.4/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: 'app.tasks.channels.send_email'
I am using python 3.4 and celery 3.1.23
All tasks must be imported during Django and Celery startup so that Celery knows about them. If we put them in <appname>/tasks.py files and call app. autodiscover_tasks(), that will do it. Or we could put our tasks in our models files, or import them from there, or import them from application ready methods.
Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
The API defines a standard set of execution options, as well as three methods: apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but does not support execution options.
If anyone needs it, I finally got it working . What I needed to do was run a celery worker for the actual file containing the task inorder for celery to register the task -
celery -A app.tasks.emails worker --loglevel=DEBUG
because simply running
celery -A app.tasks worker --loglevel=DEBUG
(this is my wild guess) would not actually import my send_email() task . If anyone can give me an explanation for this please do.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With