Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test celery periodic_task in Django?

I have a simple periodic task:

from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription

@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
    for subscription in Subscription.objects.filter(is_expired=True):
        print(subscription)
        subscription.is_active = False
        subscription.can_activate = False
        subscription.save()

And I want to cover it with tests.

I found information about how to test simple tasks, like @shared_task, but nowhere can I find an example of testing @periodic_task

like image 336
Narnik Gamarnik Avatar asked Nov 07 '22 08:11

Narnik Gamarnik


1 Answers

When having a periodic task defined with a decorator you can access the crontab configuration the following way:

tasks.py

@periodic_task(
    run_every=(crontab(minute="*/1")),
)
def my_task():
    pass

some file where you need to access it

from .tasks import my_task

crontab = my_task.run_every

hours_when_it_will_run = crontab.hour
minutes_when_it_will_run = crontab.minute
day_of_the_week_when_it_will_run = crontab.day_of_week
day_of_the_month_when_it_will_run = crontab.day_of_month
month_of_year_when_it_will_run = crontab.month_of_year

This way you can access when task will be executed and you check in your test if the expected time is there.

like image 145
hendrikschneider Avatar answered Nov 09 '22 23:11

hendrikschneider