I have a simple Django (v3.1) app where I receive data from a form, process it with a view and then pass it on to Celery (v4.4.7, RabbitMQ as broker). Depending on the data submitted in the form, it can be a one-time task or a periodic task.
The periodic task should execute the same task as the one-time task, but, well, with a periodic schedule. I would like to pass that schedule to the task, including a start date, end date and an intervall (e.g.: execute every 2 days at 4pm, starting now until 4 weeks).
My view (shortened and renamed for illustration purposes, of course):
# views.py
if request.method == 'POST':
form = BackupForm(request.POST)
if form.is_valid():
data = ...
if not form.cleaned_data['periodic']:
# execute one-time task
celery_task = single_task.delay(data)
else:
schedule = {
'first_backup': form.cleaned_data['first_backup'],
'last_backup': form.cleaned_data['last_backup'],
'intervall_every': form.cleaned_data['intervall_every'],
'intervall_unit': form.cleaned_data['intervall_unit'],
'intervall_time': form.cleaned_data['intervall_time'],
}
# execute periodic task, depending on the schedule submitted in the form
celery_task = single_task.delay(data, schedule=schedule)
return HttpResponseRedirect(reverse('app:index'))
The single task looks like this:
# tasks.py
@shared_task
def single_task(data: dict, **kwargs) -> None:
asyncio.run(bulk_screen(data=data))
# TODO: receive schedule if periodic and create a periodic task with it
This works well for the single task. However, I don't know how to adapt this to create dynamic periodic tasks. My schedule data varies, depending on the users' form input. I have to create the periodic task at runtime.
According to the official documentation on periodic tasks, crontab schedules is what I need:
from celery.schedules import crontab
app.conf.beat_schedule = {
# Executes every Monday morning at 7:30 a.m.
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
Although this looks fine, this sits in the celery config with hardcoded schedules.
I also read about the on_after_finalize.connect
decorator where I could do something like this:
@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10.0, task123.s('hello'))
But I don't know how to pass the schedule to this function. Also, what is the sender? Can I pass it from my view?
Then I read about populating the relevant models in celery beat here. But I guess there has to be a more elegant way, using the stable version without deprecated decorators.
Thank you.
First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.
We can configure periodic tasks either by manually adding the configurations to the celery.py module or using the django-celery-beat package which allows us to add periodic tasks from the Django Admin by extending the Admin functionality to allow scheduling tasks.
As far as I know, celeryd is just an old name for the celery worker command. celerybeat is a scheduler that sends predefined tasks to a celery worker at a given time. You only need to bother with this if you want to run a task on a schedule.
You should definitely try populating the PeriodicTask
or CrontabSchedule
provided with the django_celery_beat
package.link to docs
Celery beat is the scheduler which runs periodically and it will simply execute all the tasks based on a schedule (a database backed one in case of django_celery_beat
). Reference1, Reference2
Celery beat is certainly the cleanest way to handle periodic tasks with different schedules instead of creating your own scheduler or handle different schedules.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With