I'm using Celery 4.0.1
with Django 1.10
and I have troubles scheduling tasks (running a task works fine). Here is the celery configuration:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
Then in tasks.py I have:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
In views.py I want to schedule this task:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
Then I execute the commands:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
But the task is never scheduled. I don't see anything in the logs. The task is working because if in my view I do:
def my_view(request, id):
my_task.delay(id)
The task is executed.
If in my configuration file if I schedule the task manually, like this it works:
app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
},
}
I just can't schedule the task dynamically. Any idea?
To create periodic tasks, we need to define them using the beat_scheduler setting. Celery beat checks the beat_scheduler setting to manage the tasks that need to be executed periodically. To the purpose of my example I use Redis as message broker. So the first step is to tell Celery who is his messages broker.
Here are some key points: DJANGO_SETTINGS_MODULE must be set in the environment before starting a Celery process. Its presence in the environment triggers internal magic in Celery to run the Django setup at the right time. The Celery "application" must be created and configured during startup of both Django and Celery.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
Celery is a popular and powerful (open source) asynchronous task/job queue based on distributed message passing. It supports both scheduling and queuing tasks/jobs. This tutorial focuses on scheduling recurring tasks/jobs.
The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged
Actually you can't not define periodic task at the view level, because the beat schedule setting will be loaded first and can not be rescheduled at runtime:
The
add_periodic_task()
function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:app.conf.CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) }, }
which means if you want to use add_periodic_task()
it should be wrapped within an on_after_configure
handler at the celery app level and any modification on runtime will not take effect:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
As mentioned in the doc the the regular celerybeat simply keep track of task execution:
The default scheduler is the
celery.beat.PersistentScheduler
, that simply keeps track of the last run times in a local shelve database file.
In order to be able to dynamically manage periodic tasks and reschedule celerybeat at runtime:
There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.
The tasks will be persisted in django database and the scheduler could be updated in task model at the db level. Whenever you update a periodic task a counter in this tasks table will be incremented, and tells the celery beat service to reload the schedule from the database.
A possible solution for you could be as follow:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With