Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Celery worker return results from task

I have a flask app which calls a task. The task pulls data from database, plots line charts and returns html contents which is rendered on an html page. Without Celery the Flask app works fine and renders line chart on client side, but now I want to delegated celery to run task via RabbitMQ broker, which it runs as I can see the log outputs in Celery shell, but the resulting html contents never gets sent back to flask server app. How to do that?

This is a follow up to http://stackoverflow.com/questions/37839551/how-to-restart-flask-process-by-refreshing-page/38146003#38146003.

#server-celery.py

app = Flask(__name__)

@app.route('/',methods=['GET'])
def index():
   return render_template("index.html")

@app.route('/', methods=['GET', 'POST'])
def plotdata():
   form = InputForm(request.form)
   if request.method == 'POST' and form.validate():
       lineChart = task.main.delay(form.startdate.data, form.enddate.data) 
       return render_template('view.html', form=form, result= lineChart)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

And the task running as Celery worker:

#task.py
from celery import Celery

broker = 'amqp://localhost:5672'
app = Celery(__name__, broker=broker)

def queryDatabase(param1, param2):
   #query database, return results
   return data

def plotLineChart(data):
   #plot data as line Chart
   return LineChartHtmlcontent

@app.task
def main(param1,param2):
    data = queryDatabase(param1,param2)
    return plotLineChart(data)

and client html page:

<!--view.html-->

<form method=post action="">
<table>
  {% for field in form %}
    <tr>
    <td>{{ field.label }}</td>
        <td>{{ field }}</td>

    </tr>
  {% endfor %}
</table>
<p><input type=submit value=Submit></form></p>

<p>
{% if result != None %}
<img src="{{ result }}" width="500">
{% endif %}
</p>
like image 382
ArchieTiger Avatar asked Jul 08 '16 13:07

ArchieTiger


People also ask

How do you use celery task?

Once an application is created, create a task.py file and create a task. The tasks are the regular Python functions that are called with the Celery. For example - We create a function that will print the 1 to 10 integer number. Now create a view in the view.py file.

How does celery task queue work?

Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.


1 Answers

A better solution is simply to let the task run asynchronously using celery like it was intended to be used and use javascript on the page to poll the celery task periodically to see the status.

First, when you create your celery task use the bind=True parameter. This allows you to pass self into the function.

@app.task(bind=True)
def main(self, param1, param2):
    self.update_state(state='PENDING')
    data = queryDatabase(param1,param2)
    self.update_state(state='COMPLETE')
    return plotLineChart(data)

You can see that we can now update the state of the task. You can modify the code to allow metadata into the update_state method as well, for example: self.update_state(state='PROGRESS', meta={'current': 0, 'total': 12})

Now you need to poll the celery task to see how it's doing.

@blueprint.route('/status/<task_id>', methods=['GET'])
def taskstatus(task_id=None):
    task = main.AsyncResult(task_id)
    response = {
        'state': task.state,
    }
    return jsonify(response)

This will return a json object when you poll the URL with the task id. You can get the task id by using lineChart.id after you call .delay()

Please see the excellent tutorial by miguel grinberg: http://blog.miguelgrinberg.com/post/using-celery-with-flask

like image 185
Dan Safee Avatar answered Oct 21 '22 02:10

Dan Safee