Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django celery worker to send real-time status and result messages to front end

In a django app I'm running async tasks and would like to show progress, errors etc to the user. If there are errors, the user should be redirect to a page where additional input or some action is required to fix the problem. What is the best way to communicate from the celery work back to the front end?

Here's a basic structure in pseudo code:

# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

What I would ideally want:

  • compute_module.py module uses python native logger. By separation of duties I want to keep the logging as generic as possible and use the standard python/django loggers. But they don't seem to be designed to send messages to front end.
  • celery task somehow handles the logs and instead of displaying them on stdout redirects them to pusher
  • front-end js shows and handles the messages

There might be standard ways of communicating between celery worker and front end that I'm not aware off. this scenario must happen often and I am surprised it's so difficult to implement. in a way the rabbitmq message queue or aws sns should be designed for this. below are resources that I looked at but don't feel either of them work very well but maybe I am just confused.

logging: this seems to be more about logging on the server side, not sending messages to user

  • http://docs.celeryproject.org/en/latest/userguide/tasks.html#logging
  • https://docs.djangoproject.com/en/2.0/topics/logging/
  • http://oddbird.net/2017/04/17/async-notifications/
  • https://www.google.com/search?q=celery+worker+send+message+to+front+end

Celery cam seems to be about admin monitoring tasks, not sending messages to user

  • http://docs.celeryproject.org/en/latest/userguide/monitoring.html

pusher I like but I don't want to have compute_module.py deal with it. That is For example I would prefer not to do any pusher.com integration inside compute_module.py. Guess I could pass a pusher object that has already been instantiated so the module can just push messages but again I would prefer it to be generic

  • https://blog.pusher.com/improve-user-experience-app-real-time-progress-bar-tutorial/
  • https://blog.pusher.com/django-pusherable/
like image 885
citynorman Avatar asked Dec 16 '17 01:12

citynorman


1 Answers

EDIT: Moved to django-channels now, works well but more complex than solution below.

Previous:

Ok so below is pseudo code for how I've solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module. One downside is that the pusher messages are ephermeral so I'm going to have to do some extra work in LogPusher to store them in a db, something for another day...

Also in my real implementation I trigger the task via a $.post() ajax call in $(document).ready() because small tasks completed so fast the user would never see the pusher messages because the connection wasn't established (back to that historic message problem).

Another alternative route which I hadn't mentioned above is https://channels.readthedocs.io/en/latest/

[Edit] Another solutions is Server-sent events which has django implementations, havent tested it. But it looks good for uni-directional updates eg from server to client (vs websockets bidirectional). You would need a messaging system like redis pubsub to get updates to the server sse route.

Front-end updates from django server via pusher:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>
like image 170
citynorman Avatar answered Sep 21 '22 17:09

citynorman