In a django app I'm running async tasks and would like to show progress, errors etc to the user. If there are errors, the user should be redirect to a page where additional input or some action is required to fix the problem. What is the best way to communicate from the celery work back to the front end?
Here's a basic structure in pseudo code:
# views.py
from tasks import run_task
def view_task():
run_task.delay()
return render(request, 'template.html')
# tasks.py
from compute_module import compute_fct
@shared_task
def run_task():
result = compute_fct()
# how to catch status update messages from compute_module while compute_fct is running??
if result == 'error':
handle_error()
else:
handle_succes()
# compute_module
import pandas as pd
def compute_fct():
# send message: status = loading file
df = pd.read_csv('test.csv')
# send message: status = computing
val = df['col'].mean()
if val is None:
return {'status':'error'}
else:
return {'status':'success','val':val}
What I would ideally want:
compute_module.py
module uses python native logger. By separation of duties I want to keep the logging as generic as possible and use the standard python/django loggers. But they don't seem to be designed to send messages to front end. There might be standard ways of communicating between celery worker and front end that I'm not aware off. this scenario must happen often and I am surprised it's so difficult to implement. in a way the rabbitmq message queue or aws sns should be designed for this. below are resources that I looked at but don't feel either of them work very well but maybe I am just confused.
logging: this seems to be more about logging on the server side, not sending messages to user
Celery cam seems to be about admin monitoring tasks, not sending messages to user
pusher I like but I don't want to have compute_module.py
deal with it. That is For example I would prefer not to do any pusher.com integration inside compute_module.py
. Guess I could pass a pusher object that has already been instantiated so the module can just push messages but again I would prefer it to be generic
EDIT: Moved to django-channels now, works well but more complex than solution below.
Previous:
Ok so below is pseudo code for how I've solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module
. One downside is that the pusher messages are ephermeral so I'm going to have to do some extra work in LogPusher
to store them in a db, something for another day...
Also in my real implementation I trigger the task via a $.post()
ajax call in $(document).ready()
because small tasks completed so fast the user would never see the pusher messages because the connection wasn't established (back to that historic message problem).
Another alternative route which I hadn't mentioned above is https://channels.readthedocs.io/en/latest/
[Edit] Another solutions is Server-sent events which has django implementations, havent tested it. But it looks good for uni-directional updates eg from server to client (vs websockets bidirectional). You would need a messaging system like redis pubsub to get updates to the server sse route.
Front-end updates from django server via pusher:
# views.py
from tasks import run_task
def view_task():
run_task.delay('event')
return render(request, 'template.html', 'pusher_event':'event')
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct
class LogPusher(object):
def __init__(self, event):
self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
key=settings.PUSHER_KEY,
secret=settings.PUSHER_SECRET,
cluster=settings.PUSHER_CLUSTER, ssl=True)
self.event = event
def send(self, data):
self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))
@shared_task
def run_task(pusher_event):
log_pusher = LogPusher(pusher_event)
result = compute_fct(log_pusher)
# how to catch status update messages from compute_module while compute_fct is running??
if result == 'error':
log_pusher.send('status':'error')
else:
log_pusher.send('status':'success')
# compute_module.py
import pandas as pd
def compute_fct(log_pusher):
# send message: status = loading file
log_pusher.send('status':'loading file')
df = pd.read_csv('test.csv')
# send message: status = computing
log_pusher.send('status':'computing')
val = df['col'].mean()
if val is None:
return {'status':'error'}
else:
return {'status':'success','val':val}
# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings
def pusher(request):
return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }
# template.html
<script>
var pusher = new Pusher("{{PUSHER_KEY}}", {
cluster: "{{PUSHER_CLUSTER}}",
encrypted: true
});
var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
// process data
});
</script>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With