I am trying to leverage the post_save function of Django Signals in combination with Celery tasks. After a new Message object is saved to the database, I want to evaluate if the instance has one of two attributes and if it does, call the 'send_sms_function' which is a Celery registered task.
tasks.py
from my_project.celery import app
@app.task
def send_sms_message(message):
# Do something
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
import rollbar
rollbar.init('234...0932', 'production')
from dispatch.models import Message
from comm.tasks import send_sms_message
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
if instance.some_attribute == 'A' or instance.some_attribute == 'B':
try:
send_sms_message.delay(instance)
except:
rollbar.report_exc_info()
else:
pass
I'm testing this locally by running a Celery worker. When I am in the Django shell and call the Celery function, it works as expected. However when I save a Message instance to the database, the function does not work as expected: There is nothing posted to the task queue and I do not see any error messages.
What am I doing wrong?
This looks like a problem with serializing and/or your settings. When celery passes the message to your broker, it needs to have some representation of the data. Celery serializes the arguments you give a task but if you don't have it configured consistently with what you're passing (i.e. you have a mismatch where your broker is expecting JSON but you send it a pickled python object), tasks can fail simply because the worker can't easily decode what you're sending it. If you run the function in your shell (without the call to delay) it is called synchronously so there is no serialization or message passing.
In your settings you should be using the JSON serialization (unless you have a really good reason) but if not, then there could be something wrong with your pickling. You can always increase the log level to debug when you run celery to see more about serialization related errors with:
celery -A yourapp worker -l debug
When in doubt, use that print statement/function to make sure your signal receiver is running. If not, you can create an AppConfig
class that imports your receivers in it's ready
method or some other reasonable technique for making sure your receivers are being registered.
[opinion] I would suggest doing something like this:
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
enqueue_message.delay(instance.id)
in yourmodule/tasks.py
@app.task
def enqueue_message(message_id):
msg = Message.object.get(id=message_id)
if msg.some_attribute in ('A', 'B'): # slick or
send_sms_message.delay(message_id)
You can always use Celery's composition techniques but here you have something that doesn't add more complexity to your request/response cycle. [/opinion]
The expression if instance.some_attribute == 'A' or 'B'
is probably your problem.
What you probably mean is:
if instance.some_attribute == 'A' or instance.some_attribute == 'B'
Or, how I would write it:
if instance.some_attribute in ('A', 'B')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With