Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sending message from Celery task to Channels

Django 2.1.1, Django Channels 2.1.3, Celery 4.2.1

I've set up a task in Celery and at the end of the task, I need to send a websocket message to the client(s). However, the websocket message is never sent. There are no errors thrown, it just simply doesn't send.

I've set up a channel layer using Redis as the backend. Doing this from a normal Django view works fine. But when run in a Celery task, it sends the message to Channels and I can see that Channels does indeed run the code shown in my consumers.py code below, but the client never receives the websocket message.

tasks.py

def import_job(self):
    # (do task calculations, store in data dict)
    message = {'type': 'send_my_data',
               'data': json.dumps(thecalcs) }
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)('core-data', message)

consumers.py

class AsyncDataConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.channel_group_name = 'core-data'

        # Join the group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        pass

    # Receive message from the group
    async def send_my_data(self, event):
        text = event['data']
        # Send message to WebSocket
        await self.send(text_data=text)

settings.py

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

Since there is no exception/error, I am completely at a loss as to which part of this process is failing.

  1. Celery triggers the task? Yes
  2. The task runs and sends a message to the channel layer? Yes
  3. The consumer receives the message from the group and executes the send()? Yes
  4. Client receives the a websocket message? NO

Is this a problem between Channels and Redis? Is it a problem between Channels and the client?

like image 418
Charles Koch Avatar asked Mar 25 '19 14:03

Charles Koch


People also ask

Can celery tasks be async?

Celery tasks run asynchronously, which means that the Celery function call in the calling process returns immediately after the message request to perform the task is sent to the broker. There are two ways to get results back from your tasks.

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.


1 Answers

It turns out Celery was swallowing an exception in my code during the task. I need to implement more thorough logging to catch these exceptions.

like image 193
Charles Koch Avatar answered Oct 17 '22 00:10

Charles Koch