Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Register Celery Class-based Task

Python 3.x, Celery 4.x...

I have a class-based task.

myproj/celery.py

from celery import Celery

# django settings stuff...

app = Celery('myproj')
app.autodiscover_tasks()

app1/tasks.py

import celery

class EmailTask(celery.Task):
    def run(self, *args, **kwargs):
        self.do_something()

If I do:

$ celery worker -A myproj -l info
[tasks]
  . app2.tasks.debug_task
  . app2.tasks.test

So, the celery decorators work to register tasks, but the class-based task is not registered.

How do I get the class-based tasks to register?

Update 1:

If I add the following lines to app1/tasks.py

from myproj.celery import app
email_task = app.tasks[EmailTask.name]

.

$ celery worker -A myproj -l info
  File "myproj/app1/tasks.py", line 405, in <module>
    email_task = app.tasks[EmailTask.name]
  File "/usr/local/lib/python3.5/site-packages/celery/app/registry.py", line 19, in __missing__
    raise self.NotRegistered(key)
celery.exceptions.NotRegistered

Update 2:

I am able to execute my task synchronously (run) via a wrapper. However, I cannot run the task async, i.e., via delay.

app1/tasks.py

@app.task
def email_task():
    """
    Wrapper to call class based task
    """
    task = EmailTask()
    # task.delay()  # Won't work!!!
    task.run()

.

$./manage.py shell
> from app1.tasks import EmailTask
> task1 = EmailTask()
> task1.run() # a-okay
> task2 = EmailTask()
> task2.delay() # nope
  <AsyncResult: 1c03bad9-169a-4a4e-a56f-7d83892e8bbc>

# And on the worker...
[2017-01-22 08:07:28,120: INFO/PoolWorker-1] Task app1.tasks.email_task[41e5bc7d-058a-400e-9f73-c853c0f60a2a] succeeded in 0.0701281649817247s: None
[2017-01-22 08:10:31,909: ERROR/MainProcess] Received unregistered task of type None.
The message has been ignored and discarded.
like image 684
James Avatar asked Jan 22 '17 06:01

James


People also ask

How do I register a celery task?

Register a task in the task registry. The task will be automatically instantiated if not already an instance. Name must be configured prior to registration. Unregister task by name.

How does celery execute tasks?

Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.

What is task ID in celery?

Since Celery 2.2.0, information related to the currently executed task is saved to task.request (it's called «the context»). So you should get task id from this context (not from keyword arguments, which are deprecated): @task def do_job(path): cache.


1 Answers

You can find full description here, but for me it was enough to add

from myapp.celery import app
app.tasks.register(MyTaskTask())
like image 166
Oleksandr Dashkov Avatar answered Sep 30 '22 17:09

Oleksandr Dashkov