Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

handle `post_save` signal in celery

I have a rather long running task that needs to be executed after inserting or updating an specific model.

I decided to use post_save signal instead of overriding save method to reduce coupling. Since Django signals are not asynchronous I had to do the long running job as a Celery task (which we already have in our stack).

A simplified version of my signal handling function is as follows:

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    handle_save_task.apply_async(args=(instance.pk,))

Also, because the job is done asynchronously I passed the primary key of the object instead of the instance itself.

@app.task(queue='elastic')
def handle_save_task(instance_pk):
    try:
        instance = MyModel.objects.get(pk=instance_pk)
    except ObjectDoesNotExist:
        # Abort
        logger.warning("Saved object was deleted before this task get a chance to be executed [id = %d]" % instance_pk)
    else:
        # Do my things with instance

The actual problem is that when the celery task is executed it can't access the newly saved instance. Just like if it was executed prior to saving! (wasn't the signal called post_save? What an irony)

By "executed before saving" I mean if it is a new instance that's being inserted to DB, in the celery task I get an DoesNotExist exception and in cases where instance was already in DB and the save method was called to update some of its properties I get the old instance with old property values in the celery task.

A workaround is to run celery tasks with a few seconds of delay, but obviously it is not a good solution and also can not guarantee the proper execution behavior under heavy loads or long network delays.

Am I doing it completely wrong or with some slight modifications I can make it work?

like image 506
Mohammad Jafar Mashhadi Avatar asked Jul 24 '17 09:07

Mohammad Jafar Mashhadi


1 Answers

This is probably caused by your update being performed inside a transaction. The transaction is committed after the Celery Task has already started which causes the Celery Task to see your old value when running.

You can try the following change:

from django.db import transaction

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    transaction.on_commit(lambda: handle_save_task.apply_async(args=(instance.pk,)))
like image 101
Matthijs Brouns Avatar answered Oct 24 '22 14:10

Matthijs Brouns