I have a simple periodic task:
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription
@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
for subscription in Subscription.objects.filter(is_expired=True):
print(subscription)
subscription.is_active = False
subscription.can_activate = False
subscription.save()
And I want to cover it with tests.
I found information about how to test simple tasks, like @shared_task, but nowhere can I find an example of testing @periodic_task
When having a periodic task defined with a decorator you can access the crontab configuration the following way:
tasks.py
@periodic_task(
run_every=(crontab(minute="*/1")),
)
def my_task():
pass
some file where you need to access it
from .tasks import my_task
crontab = my_task.run_every
hours_when_it_will_run = crontab.hour
minutes_when_it_will_run = crontab.minute
day_of_the_week_when_it_will_run = crontab.day_of_week
day_of_the_month_when_it_will_run = crontab.day_of_month
month_of_year_when_it_will_run = crontab.month_of_year
This way you can access when task will be executed and you check in your test if the expected time is there.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With