Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Importing Celery in Flask Blueprints

I have a Flask Application with an MVC structure:

my_app
├── server.py
├── requirements.txt
├── models
│   ├── __init__.py
    └── model.py
├── controllers
    ├── __init__.py
    ├── client_controllers
        └──controller.py
    └── another_controller.py
└── templates

I use blueprints to split the server code in "controllers" so I have something like this:

server.py:

from flask import Flask
from celery import Celery

from controllers.client_controllers.controller import controller

app = Flask(__name__)
app.secret_key = 'SECRET'

app.register_blueprint(controller)

# Celery Configuration
def make_celery(app):

    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


if __name__ == "__main__":
    app.run(host='0.0.0.0', debug=True)

controller.py:

from flask import Blueprint, render_template, json, request, redirect, url_for, abort, session



controller = Blueprint('controller', __name__,
                                     template_folder='templates/')

@celery.task()
def add_together(a, b):
    return a + b

@controller.route('/add', methods=['GET'])
def add():
    result = add_together.delay(23, 42)
    result.wait()
    return 'Processing'

As you may notice, celery is not imported into the controller, because I don't know how to import the celery instance from server.py into my controller.py without getting an error, I've been trying with:

from ...server import celery
from ..server import celery
...etc

but still failing with errors.

like image 573
Sebas Pinto Avatar asked Dec 31 '22 09:12

Sebas Pinto


2 Answers

The flask Error RuntimeError: Working outside of application context. happens because you are not in a Flask application_context().

You should use celery shared_task which is what you need given your MVC structure.

celery_flask/
├── celery_tasks
│   ├── app_tasks.py
│   ├── __init__.py
├── celery_worker.py
├── controllers
│   ├── __init__.py
│   ├── some_controller.py
├── __init__.py
└── server.py

Script app_tasks.py

#=====================
#   app_tasks.py
#=====================
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task(name='celery_tasks.add_together')
def add_together(x, y):
    return x + y

The @shared_task decorator returns a proxy that always points to the active Celery instances:

>>> from celery import Celery, shared_task
>>> @shared_task
... def add_together(x, y):
...     return x + y
...
>>> app1 = Celery(broker='amqp://')
>>> add_together.app is app1
True
>>> app2 = Celery(broker='redis://')
>>> add_together.app is app2
True

After you define you task you can call them using a reference to a Celery app. This celery app could be part of the flask application_context(). Example:

Script server.py

from __future__ import absolute_import
from flask import Flask
from celery import Celery

from controllers.some_controller import controller

flask_app = Flask(__name__)
flask_app.secret_key = 'SECRET'

flask_app.register_blueprint(controller)

# Celery Configuration
def make_celery( app ):
    celery = Celery('flask-celery-app', backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'],
                    include=['celery_tasks.app_tasks'])
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

def list_celery_task( ):
    from celery.task.control import inspect
    i = inspect()
    i.registered_tasks()
    from itertools import chain
    t = set(chain.from_iterable( i.registered_tasks().values() ))
    print "registered_tasks={}".format( t )
#======================================
#                MAIN
#======================================
flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
flask_app.celery = celery
list_celery_task( )


if __name__ == "__main__":
    flask_app.run(host='0.0.0.0', debug=True)

Script some_controller.py

#============================
#   some_controller.py
#============================
from __future__ import absolute_import
from flask import Blueprint
from flask import current_app

controller = Blueprint('controller', __name__,
                                     template_folder='templates/')

@controller.route('/add', methods=['GET'])
def add():
    print "calling add"
    result = current_app.celery.send_task('celery_tasks.add_together',args=[12,6])
    r = result.get()
    print 'Processing is {}'.format( r )
    return 'Processing is {}'.format( r )

Finally, start the worker to consume the tasks:

celery -A celery_worker worker --loglevel=DEBUG

Script celery_worker.py

#============================
#   celery_worker.py
#============================
from __future__ import absolute_import
from celery import Celery

# Celery Configuration
def make_celery():
    celery = Celery('flask-celery-app', backend='redis://localhost:6379',
                    broker='redis://localhost:6379',
                    include=['celery_tasks.app_tasks'])
    return celery

celery = make_celery()
print "tasks={}".format( celery.tasks.keys() )
like image 107
ssuarezbe Avatar answered Jan 05 '23 16:01

ssuarezbe


One option is to assign celery instance to the app instance and then access it through flask's current_app.

In you server.py, just add:

celery = make_celery(app)
app.celery = celery

Then you can access this in your controller.py:

from flask import current_app

@current_app.celery.task()
def add_together(a, b):
    return a + b
like image 41
D Dhaliwal Avatar answered Jan 05 '23 15:01

D Dhaliwal