Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Tasks on Django Models

I'm trying to learn how to use celery to check a date every day on one of my models. One of my models holds an expiration date and a boolean field that says whether their insurance is expired or not.

The model is pretty big so I'm going to post a condensed version. I think I have two options. Either run a celery task on the model method or rewrite the function in my tasks.py. Then I need to use Celery beat to run the schedule to check daily.

I've got the function to work, but I'm passing in the model objects directly which I think is wrong.

I'm also having trouble on how to use the args in the celery beat scheduler inside of my celery.py.

I'm really close to getting this to work, but I think I'm going about executing a task in the wrong way. and I think executing a task on the model method might be the cleanest, I'm just not sure how to accomplish that.

models.py

class CarrierCompany(models.Model):
    name = models.CharField(max_length=255, unique=True)
    insurance_expiration = models.DateTimeField(null=True)
    insurance_active = models.BooleanField()

    def insurance_expiration_check(self):
        if self.insurance_expiration > datetime.today().date():
            self.insurance_active = True
            self.save()
            print("Insurance Active")
        else:
            self.insurance_active = False
            self.save()
            print("Insurance Inactive")

tasks.py

from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany



@task(name="insurance_expired")
def insurance_date():
    carriers = CarrierCompany.objects.all()
    for carrier in carriers:
        date = datetime.now(timezone.utc)
        if carrier.insurance_expiration > date:
            carrier.insurance_active = True
            carrier.save()
            print("Insurance Active")
        else:
            carrier.insurance_active = False
            carrier.save()
            print("Insurance Inactive")

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')

app = Celery('POTRTMS')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


app.conf.beat_schedule = {
    'check-insurance-daily': {
        'task': 'insurance_expired',
        'schedule': crontab(hour='8')
    },
}

*** updated the beat schedule to reflect when I actually want to run it.

like image 357
Kris Tryber Avatar asked Mar 05 '18 17:03

Kris Tryber


People also ask

Should I use Celery with Django?

Celery makes it easier to implement the task queues for many workers in a Django application.

How do I add a task to Celery?

All tasks must be imported during Django and Celery startup so that Celery knows about them. If we put them in <appname>/tasks.py files and call app. autodiscover_tasks(), that will do it. Or we could put our tasks in our models files, or import them from there, or import them from application ready methods.

Why do we need Celery?

Celery is an open-source Python library which is used to run the tasks asynchronously. It is a task queue that holds the tasks and distributes them to the workers in a proper manner. It is primarily focused on real-time operation but also supports scheduling (run regular interval tasks).


1 Answers

An example for how I might do it would be as follows. Also, instead of using traditional datetime, if you are including timezones in your Django App, you will probably want to use the timezone library instead found here.

models.py

class CarrierCompany(models.Model):
    ...

    @property
    def is_insurance_expired(self):
        from django.utils import timezone
        if self.insurance_expiration > timezone.datetime.today():
            print("Insurance Active")
            return True
        else:
            print("Insurance Active")
            return False

tasks.py

def insurance_date():
    carriers = CarrierCompany.objects.all()
    for carrier in carriers:
        if carrier.is_insurance_expired:
            carrier.insurance_active = True
            carrier.save()
        else:
            carrier.insurance_active = False
            carrier.save()

There are other things you could do, like not update it if its False and make the default False, or vice versa in the True case. You could also just do all of that in the models function, though I personally like to keep logic a bit separate (just how I organize my stuff). Hopefully this helps get you past where you are stuck though.

like image 154
CoolestNerdIII Avatar answered Sep 25 '22 04:09

CoolestNerdIII