Django 2.1.1, Django Channels 2.1.3, Celery 4.2.1
I've set up a task in Celery and at the end of the task, I need to send a websocket message to the client(s). However, the websocket message is never sent. There are no errors thrown, it just simply doesn't send.
I've set up a channel layer using Redis as the backend. Doing this from a normal Django view works fine. But when run in a Celery task, it sends the message to Channels and I can see that Channels does indeed run the code shown in my consumers.py code below, but the client never receives the websocket message.
tasks.py
def import_job(self):
# (do task calculations, store in data dict)
message = {'type': 'send_my_data',
'data': json.dumps(thecalcs) }
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)('core-data', message)
consumers.py
class AsyncDataConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.channel_group_name = 'core-data'
# Join the group
await self.channel_layer.group_add(
self.channel_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
# Leave the group
await self.channel_layer.group_discard(
self.channel_group_name,
self.channel_name
)
# Receive message from WebSocket
async def receive(self, text_data=None, bytes_data=None):
pass
# Receive message from the group
async def send_my_data(self, event):
text = event['data']
# Send message to WebSocket
await self.send(text_data=text)
settings.py
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}
Since there is no exception/error, I am completely at a loss as to which part of this process is failing.
send()
? YesIs this a problem between Channels and Redis? Is it a problem between Channels and the client?
Celery tasks run asynchronously, which means that the Celery function call in the calling process returns immediately after the message request to perform the task is sent to the broker. There are two ways to get results back from your tasks.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
It turns out Celery was swallowing an exception in my code during the task. I need to implement more thorough logging to catch these exceptions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With