Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery + Django Signals

I am trying to leverage the post_save function of Django Signals in combination with Celery tasks. After a new Message object is saved to the database, I want to evaluate if the instance has one of two attributes and if it does, call the 'send_sms_function' which is a Celery registered task.

tasks.py

from my_project.celery import app

@app.task
def send_sms_message(message):
    # Do something

signals.py

from django.db.models.signals import post_save
from django.dispatch import receiver

import rollbar
rollbar.init('234...0932', 'production')

from dispatch.models import Message
from comm.tasks import send_sms_message


@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):

if instance.some_attribute == 'A' or instance.some_attribute == 'B':
    try:
        send_sms_message.delay(instance)
    except:
         rollbar.report_exc_info()
else:
    pass

I'm testing this locally by running a Celery worker. When I am in the Django shell and call the Celery function, it works as expected. However when I save a Message instance to the database, the function does not work as expected: There is nothing posted to the task queue and I do not see any error messages.

What am I doing wrong?

like image 732
Joe Fusaro Avatar asked Jul 07 '15 16:07

Joe Fusaro


2 Answers

This looks like a problem with serializing and/or your settings. When celery passes the message to your broker, it needs to have some representation of the data. Celery serializes the arguments you give a task but if you don't have it configured consistently with what you're passing (i.e. you have a mismatch where your broker is expecting JSON but you send it a pickled python object), tasks can fail simply because the worker can't easily decode what you're sending it. If you run the function in your shell (without the call to delay) it is called synchronously so there is no serialization or message passing.

In your settings you should be using the JSON serialization (unless you have a really good reason) but if not, then there could be something wrong with your pickling. You can always increase the log level to debug when you run celery to see more about serialization related errors with:

celery -A yourapp worker -l debug

When in doubt, use that print statement/function to make sure your signal receiver is running. If not, you can create an AppConfig class that imports your receivers in it's ready method or some other reasonable technique for making sure your receivers are being registered.

[opinion] I would suggest doing something like this:

@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
    enqueue_message.delay(instance.id)

in yourmodule/tasks.py

@app.task
def enqueue_message(message_id):
    msg = Message.object.get(id=message_id)
    if msg.some_attribute in ('A', 'B'):  # slick or
        send_sms_message.delay(message_id)

You can always use Celery's composition techniques but here you have something that doesn't add more complexity to your request/response cycle. [/opinion]

like image 189
theWanderer4865 Avatar answered Sep 30 '22 06:09

theWanderer4865


The expression if instance.some_attribute == 'A' or 'B' is probably your problem.

What you probably mean is:

if instance.some_attribute == 'A' or instance.some_attribute == 'B'

Or, how I would write it:

if instance.some_attribute in ('A', 'B')
like image 39
dgel Avatar answered Sep 30 '22 07:09

dgel