It's not a question but help to those who will find that the declaration of periodic tasks described in celery 4.0.1 documentation is hard to integrate in django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries
copy paste celery config file main_app/celery.py
:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
But what if we use django and our tasks are placed in another app? With celery 4.0.1
we no longer have @periodic_task
decorator. So let's see what we can do.
If you prefer to keep tasks and their schedule close to each other:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
We can run beat
in debug mode:
celery -A main_app beat -l debug
and we will see that there's no such periodic task.
We can try to describe all periodic tasks in config file like this:
main_app/celery.py
...
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
The result is the same. But it will behave differently that you can see with manual debugging via pdb. In first example setup_periodic_tasks
callback will not be fired at all. But in second example we'll get django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(this exception will not be print out)
schedules import crontab from celery. decorators import periodic_task @periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1)) def every_monday_morning(): print("Execute every Monday at 7:30AM.") Execute every minute. Execute daily at midnight.
First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.
We can configure periodic tasks either by manually adding the configurations to the celery.py module or using the django-celery-beat package which allows us to add periodic tasks from the Django Admin by extending the Admin functionality to allow scheduling tasks.
For django we need to use another signal: @celery_app.on_after_finalize.connect
. It can be used for both:
app/tasks.py
because this signal will be fired after all tasks.py
imported and all possible receivers already subscribed (first case).I think I should write down final declaration:
Declaration of task schedule close to task:
main_app/some_app/tasks.py
from main_app.celery import app as celery_app
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'))
@celery_app.task
def test(arg):
print(arg)
Centralized schedule declaration in config file main_app/celery.py
:
...
app = Celery()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
from main_app.some_app.tasks import test
sender.add_periodic_task(10.0, test.s('hello'))
...
If the intent is to maintain task logic separately in tasks.py, then calling from main_app.some_app.tasks import test
inside setup_periodic_tasks
did not work for me. What worked is the following:
celery.py
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
@app.task
def test(arg):
print(arg)
from some_app.tasks import test
test(arg)
tasks.py
@shared_task
def test(arg):
print('world')
This resulted in the following output:
[2017-10-26 22:52:42,262: INFO/MainProcess] celery@ubuntu-xenial ready.
[2017-10-26 22:52:42,263: INFO/MainProcess] Received task: main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4]
[2017-10-26 22:52:42,367: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:42,368: WARNING/ForkPoolWorker-2] world
[2017-10-26 22:52:42,369: INFO/ForkPoolWorker-2] Task main_app.celery.test[3cbdf4fa-ff63-401a-a9e4-cfd1b6bb4ad4] succeeded in 0.002823335991706699s: None
[2017-10-26 22:52:51,205: INFO/Beat] Scheduler: Sending due task add every 10 (main_app.celery.test)
[2017-10-26 22:52:51,207: INFO/MainProcess] Received task: main_app.celery.test[ce0f3cfc-54d5-4d74-94eb-7ced2e5a6c4b]
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] hello
[2017-10-26 22:52:51,209: WARNING/ForkPoolWorker-2] world
If you want to use task logic seperately, use this setup:
celery.py:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'backend.settings') # your settings.py path
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5, periodic_task.s('sms'), name='SMS Process')
sender.add_periodic_task(60, periodic_task.s('email'), name='Email Process')
@app.task
def periodic_task(taskname):
from myapp.tasks import sms_process, email_process
if taskname == 'sms':
sms_process()
elif taskname == 'email':
email_process()
a sample task in a django app named myapp
:
myapp/tasks.py:
def sms_process():
print('send sms task')
def email_process():
print('send email task')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With