I have a rather long running task that needs to be executed after inserting or updating an specific model.
I decided to use post_save
signal instead of overriding save
method to reduce coupling. Since Django signals are not asynchronous I had to do the long running job as a Celery task (which we already have in our stack).
A simplified version of my signal handling function is as follows:
@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
handle_save_task.apply_async(args=(instance.pk,))
Also, because the job is done asynchronously I passed the primary key of the object instead of the instance itself.
@app.task(queue='elastic')
def handle_save_task(instance_pk):
try:
instance = MyModel.objects.get(pk=instance_pk)
except ObjectDoesNotExist:
# Abort
logger.warning("Saved object was deleted before this task get a chance to be executed [id = %d]" % instance_pk)
else:
# Do my things with instance
The actual problem is that when the celery task is executed it can't access the newly saved instance. Just like if it was executed prior to saving! (wasn't the signal called post_save? What an irony)
By "executed before saving" I mean if it is a new instance that's being inserted to DB, in the celery task I get an DoesNotExist
exception and in cases where instance was already in DB and the save method was called to update some of its properties I get the old instance with old property values in the celery task.
A workaround is to run celery tasks with a few seconds of delay, but obviously it is not a good solution and also can not guarantee the proper execution behavior under heavy loads or long network delays.
Am I doing it completely wrong or with some slight modifications I can make it work?
This is probably caused by your update being performed inside a transaction. The transaction is committed after the Celery Task has already started which causes the Celery Task to see your old value when running.
You can try the following change:
from django.db import transaction
@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
transaction.on_commit(lambda: handle_save_task.apply_async(args=(instance.pk,)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With