I'm trying to learn how to use celery to check a date every day on one of my models. One of my models holds an expiration date and a boolean field that says whether their insurance is expired or not.
The model is pretty big so I'm going to post a condensed version. I think I have two options. Either run a celery task on the model method or rewrite the function in my tasks.py. Then I need to use Celery beat to run the schedule to check daily.
I've got the function to work, but I'm passing in the model objects directly which I think is wrong.
I'm also having trouble on how to use the args in the celery beat scheduler inside of my celery.py.
I'm really close to getting this to work, but I think I'm going about executing a task in the wrong way. and I think executing a task on the model method might be the cleanest, I'm just not sure how to accomplish that.
models.py
class CarrierCompany(models.Model):
name = models.CharField(max_length=255, unique=True)
insurance_expiration = models.DateTimeField(null=True)
insurance_active = models.BooleanField()
def insurance_expiration_check(self):
if self.insurance_expiration > datetime.today().date():
self.insurance_active = True
self.save()
print("Insurance Active")
else:
self.insurance_active = False
self.save()
print("Insurance Inactive")
tasks.py
from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany
@task(name="insurance_expired")
def insurance_date():
carriers = CarrierCompany.objects.all()
for carrier in carriers:
date = datetime.now(timezone.utc)
if carrier.insurance_expiration > date:
carrier.insurance_active = True
carrier.save()
print("Insurance Active")
else:
carrier.insurance_active = False
carrier.save()
print("Insurance Inactive")
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')
app = Celery('POTRTMS')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
app.conf.beat_schedule = {
'check-insurance-daily': {
'task': 'insurance_expired',
'schedule': crontab(hour='8')
},
}
*** updated the beat schedule to reflect when I actually want to run it.
Celery makes it easier to implement the task queues for many workers in a Django application.
All tasks must be imported during Django and Celery startup so that Celery knows about them. If we put them in <appname>/tasks.py files and call app. autodiscover_tasks(), that will do it. Or we could put our tasks in our models files, or import them from there, or import them from application ready methods.
Celery is an open-source Python library which is used to run the tasks asynchronously. It is a task queue that holds the tasks and distributes them to the workers in a proper manner. It is primarily focused on real-time operation but also supports scheduling (run regular interval tasks).
An example for how I might do it would be as follows. Also, instead of using traditional datetime, if you are including timezones in your Django App, you will probably want to use the timezone library instead found here.
models.py
class CarrierCompany(models.Model):
...
@property
def is_insurance_expired(self):
from django.utils import timezone
if self.insurance_expiration > timezone.datetime.today():
print("Insurance Active")
return True
else:
print("Insurance Active")
return False
tasks.py
def insurance_date():
carriers = CarrierCompany.objects.all()
for carrier in carriers:
if carrier.is_insurance_expired:
carrier.insurance_active = True
carrier.save()
else:
carrier.insurance_active = False
carrier.save()
There are other things you could do, like not update it if its False and make the default False, or vice versa in the True case. You could also just do all of that in the models function, though I personally like to keep logic a bit separate (just how I organize my stuff). Hopefully this helps get you past where you are stuck though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With