Python 3.x, Celery 4.x...
I have a class-based task.
myproj/celery.py
from celery import Celery
# django settings stuff...
app = Celery('myproj')
app.autodiscover_tasks()
app1/tasks.py
import celery
class EmailTask(celery.Task):
def run(self, *args, **kwargs):
self.do_something()
If I do:
$ celery worker -A myproj -l info
[tasks]
. app2.tasks.debug_task
. app2.tasks.test
So, the celery decorators work to register tasks, but the class-based task is not registered.
Update 1:
If I add the following lines to app1/tasks.py
from myproj.celery import app
email_task = app.tasks[EmailTask.name]
.
$ celery worker -A myproj -l info
File "myproj/app1/tasks.py", line 405, in <module>
email_task = app.tasks[EmailTask.name]
File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__
raise self.NotRegistered(key)
celery.exceptions.NotRegistered
Update 2:
I am able to execute my task synchronously (run
) via a wrapper. However, I cannot run the task async, i.e., via delay
.
app1/tasks.py
@app.task
def email_task():
"""
Wrapper to call class based task
"""
task = EmailTask()
# task.delay() # Won't work!!!
task.run()
.
$./manage.py shell
> from app1.tasks import EmailTask
> task1 = EmailTask()
> task1.run() # a-okay
> task2 = EmailTask()
> task2.delay() # nope
<AsyncResult: 1c03bad9-169a-4a4e-a56f-7d83892e8bbc>
# And on the worker...
[2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None
[2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None.
The message has been ignored and discarded.
Register a task in the task registry. The task will be automatically instantiated if not already an instance. Name must be configured prior to registration. Unregister task by name.
Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.
Since Celery 2.2.0, information related to the currently executed task is saved to task.request (it's called «the context»). So you should get task id from this context (not from keyword arguments, which are deprecated): @task def do_job(path): cache.
You can find full description here, but for me it was enough to add
from myapp.celery import app
app.tasks.register(MyTaskTask())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With