Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django Celery: create periodic task at runtime with schedule depending on user input

I have a simple Django (v3.1) app where I receive data from a form, process it with a view and then pass it on to Celery (v4.4.7, RabbitMQ as broker). Depending on the data submitted in the form, it can be a one-time task or a periodic task.

The periodic task should execute the same task as the one-time task, but, well, with a periodic schedule. I would like to pass that schedule to the task, including a start date, end date and an intervall (e.g.: execute every 2 days at 4pm, starting now until 4 weeks).

My view (shortened and renamed for illustration purposes, of course):

# views.py

if request.method == 'POST':
    form = BackupForm(request.POST)
        
    if form.is_valid():
        data = ...

        if not form.cleaned_data['periodic']: 
            # execute one-time task 
            celery_task = single_task.delay(data)

        else:
            schedule = {
                'first_backup': form.cleaned_data['first_backup'],
                'last_backup': form.cleaned_data['last_backup'],
                'intervall_every': form.cleaned_data['intervall_every'],
                'intervall_unit': form.cleaned_data['intervall_unit'],
                'intervall_time': form.cleaned_data['intervall_time'],
            }

            # execute periodic task, depending on the schedule submitted in the form
            celery_task = single_task.delay(data, schedule=schedule)

        return HttpResponseRedirect(reverse('app:index'))

The single task looks like this:

# tasks.py

@shared_task
def single_task(data: dict, **kwargs) -> None:
    asyncio.run(bulk_screen(data=data))
    # TODO: receive schedule if periodic and create a periodic task with it

This works well for the single task. However, I don't know how to adapt this to create dynamic periodic tasks. My schedule data varies, depending on the users' form input. I have to create the periodic task at runtime.

According to the official documentation on periodic tasks, crontab schedules is what I need:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

Although this looks fine, this sits in the celery config with hardcoded schedules.

I also read about the on_after_finalize.connect decorator where I could do something like this:

@celery_app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10.0, task123.s('hello'))

But I don't know how to pass the schedule to this function. Also, what is the sender? Can I pass it from my view?

Then I read about populating the relevant models in celery beat here. But I guess there has to be a more elegant way, using the stable version without deprecated decorators.

Thank you.

like image 980
sqe Avatar asked Nov 10 '20 13:11

sqe


People also ask

How do I schedule a task in django celery?

First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.

How do you create a periodic task in django?

We can configure periodic tasks either by manually adding the configurations to the celery.py module or using the django-celery-beat package which allows us to add periodic tasks from the Django Admin by extending the Admin functionality to allow scheduling tasks.

What is the difference between celery and celery beat?

As far as I know, celeryd is just an old name for the celery worker command. celerybeat is a scheduler that sends predefined tasks to a celery worker at a given time. You only need to bother with this if you want to run a task on a schedule.


1 Answers

You should definitely try populating the PeriodicTask or CrontabSchedule provided with the django_celery_beat package.link to docs

Celery beat is the scheduler which runs periodically and it will simply execute all the tasks based on a schedule (a database backed one in case of django_celery_beat). Reference1, Reference2

Celery beat is certainly the cleanest way to handle periodic tasks with different schedules instead of creating your own scheduler or handle different schedules.

like image 151
Aman Garg Avatar answered Oct 26 '22 03:10

Aman Garg