Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connect new celery periodic task in django

It's not a question but help to those who will find that the declaration of periodic tasks described in celery 4.0.1 documentation is hard to integrate in django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries

copy paste celery config file main_app/celery.py:

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

Question

But what if we use django and our tasks are placed in another app? With celery 4.0.1 we no longer have @periodic_task decorator. So let's see what we can do.

First case

If you prefer to keep tasks and their schedule close to each other:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

We can run beat in debug mode:

celery -A main_app beat -l debug

and we will see that there's no such periodic task.

Second case

We can try to describe all periodic tasks in config file like this:

main_app/celery.py

...
app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...

The result is the same. But it will behave differently that you can see with manual debugging via pdb. In first example setup_periodic_tasks callback will not be fired at all. But in second example we'll get django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. (this exception will not be print out)

like image 441
vvkuznetsov Avatar asked Dec 13 '16 10:12

vvkuznetsov


People also ask

How do I import a periodic task into Celery?

schedules import crontab from celery. decorators import periodic_task @periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1)) def every_monday_morning(): print("Execute every Monday at 7:30AM.") Execute every minute. Execute daily at midnight.

How do I schedule a task in Django Celery?

First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.

How do you create a periodic task in Django?

We can configure periodic tasks either by manually adding the configurations to the celery.py module or using the django-celery-beat package which allows us to add periodic tasks from the Django Admin by extending the Admin functionality to allow scheduling tasks.


3 Answers

For django we need to use another signal: @celery_app.on_after_finalize.connect. It can be used for both:

  • declaration of task schedule close to task in app/tasks.py because this signal will be fired after all tasks.py imported and all possible receivers already subscribed (first case).
  • centralized schedule declaration because django apps will be already initialized and ready for imports (second case)

I think I should write down final declaration:

First case

Declaration of task schedule close to task:

main_app/some_app/tasks.py

from main_app.celery import app as celery_app

@celery_app.on_after_finalize.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'))

@celery_app.task
def test(arg):
    print(arg)

Second case

Centralized schedule declaration in config file main_app/celery.py:

...

app = Celery()

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    from main_app.some_app.tasks import test
    sender.add_periodic_task(10.0, test.s('hello'))
...
like image 150
vvkuznetsov Avatar answered Oct 19 '22 09:10

vvkuznetsov


If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test inside setup_periodic_tasks did not work for me. What worked is the following:

celery.py

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

@app.task
def test(arg):
    print(arg)
    from some_app.tasks import test
    test(arg)

tasks.py

@shared_task
def test(arg):
    print('world')

This resulted in the following output:

[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]  
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]  
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
like image 30
Prasanna Avatar answered Oct 19 '22 10:10

Prasanna


If you want to use task logic seperately, use this setup:

celery.py:

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
    sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')


@app.task
def periodic_task(taskname):
    from myapp.tasks import sms_process, email_process

    if taskname == 'sms':
        sms_process()

    elif taskname == 'email':
        email_process()

a sample task in a django app named myapp:

myapp/tasks.py:

def sms_process():
    print('send sms task')

def email_process():
    print('send email task')
like image 1
suhailvs Avatar answered Oct 19 '22 10:10

suhailvs