I'm really struggling to the get the proper setup for Flask, SQLAlchemy and Celery. I have searched extensively and tried different approaches, nothing really seems to work. Either I missed the application context or can't run the workers or there are some other problems. The structure is very general so that I can build a bigger application.
I'm using: Flask 0.10.1, SQLAlchemy 1.0, Celery 3.1.13, my current setup is the following:
app/__init__.py
#Empty
app/config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
@staticmethod
def init_app(app):
pass
class LocalConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir,
"data-dev.sqlite")
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
config = {
"local": LocalConfig}
app/exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
app/factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config
def create_before_request(app):
def before_request():
g.db = db
return before_request
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
celery.config_from_object(config)
# Register the blueprints
# Add the before request handler
app.before_request(create_before_request(app))
return app
app/manage.py
from factory import create_app
app = create_app("local")
from flask import render_template
from flask import request
@app.route('/test', methods=['POST'])
def task_simple():
import tasks
tasks.do_some_stuff.delay()
return ""
if __name__ == "__main__":
app.run()
app/models.py
from extensions import db
class User(db.Model):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(128), unique=True, nullable=False)
app/tasks.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app
@task_prerun.connect
def close_session(*args, **kwargs):
with current_app.app_context():
# use g.db
print g
@celery.task()
def do_some_stuff():
with current_app.app_context():
# use g.db
print g
In the folder app:
python.exe manage.py
celery.exe worker -A tasks
I get an import error that doesn't make any sense to me. Should I structure the application differently? At the end I think I want a quite basic setup, e.g. Using Flask with the factory pattern, be able to use the Flask-SQLAlchmey extension and have some worker that needs to access the database.
Any help is highly appreciated.
Traceback is executed when starting the celery worker.
Traceback (most recent call last):
File "[PATH]\scripts\celery-script.py", line 9, in <module>
load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()
File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
main()
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
user_preload = tuple(self.app.user_options['preload'] or ())
AttributeError: 'Flask' object has no attribute 'user_options'
UPDATE I change the code according to the suggestion in the comment. The worker starts now up but when test it with a get request to http://127.0.0.1:5000/test
. I get the following traceback:
Traceback (most recent call last):
File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
args=args, kwargs=kwargs)
File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
response = receiver(signal=self, sender=sender, \**named)
File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
with current_app.app_context():
File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
return getattr(self._get_current_object(), name)
File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
return self.__local()
File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context exc, exc_info.traceback)))
UPDATE Based on the comments from Marteen, I changed the code. The current working version is under: https://gist.github.com/anonymous/fa47834db2f4f3b8b257. Any further improvements or suggestions are welcome.
I was off with the current_app advice.
Your celery object needs access to the application context. I found some information online about creating the Celery object with a factory function. Example below is tested without a message broker.
#factory.py
from celery import Celery
from config import config
def create_celery_app(app=None):
app = app or create_app(config)
celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
and in tasks.py:
#tasks.py
from factory import create_celery_app
from celery.signals import task_prerun
from flask import g
celery = create_celery_app()
@task_prerun.connect
def celery_prerun(*args, **kwargs):
#print g
with celery.app.app_context():
# # use g.db
print g
@celery.task()
def do_some_stuff():
with celery.app.app_context():
# use g.db
g.user = "test"
print g.user
Some links:
Flask pattern for creating a Celery instance with factory function
Application using both application factory and celery
Source for said application's factory.py
Source for application tasks.py
Here is a solution which works with the flask application factory pattern and also creates celery task with context, without needing to use app.app_context()
explicitly in the tasks. In my app, it is really tricky to get that app object while avoiding circular imports, but this solves it. This is also good for the latest celery version 4.2 at time of writing.
Structure:
repo_name/
manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
So base
is the main application package in this example. In the base/__init__.py
we create the celery instance as below:
from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')
The base/app.py
file contains the flask app factory create_app
and note the init_celery(app, celery)
it contains:
from base import celery
from base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask('base')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
Moving on to base/runcelery.py
contents:
from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
Next, the base/celeryconfig.py
file (as an example):
# -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)
## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False
accept_content = ['json', 'application/text']
result_serializer = 'json'
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'workers.add_together',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
# }
Now define the init_celery in the base/utility/celery_util.py
file:
# -*- coding: utf-8 -*-
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
For the workers in base/tasks/workers.py
:
from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security's mail.
You don't need to use with app.app_context() as Task has app context.
"""
user = User.query.filter_by(id=user_id).first()
print(f'sending user {user} a welcome email')
send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
email,
'welcome', user=user,
confirmation_link=confirmation_link)
@celery_app.task
def do_some_stuff():
print(g)
Then, you need to start the celery beat and celery worker in two different cmd prompts from inside the repo_name
folder.
In one cmd prompt do a celery -A base.runcelery:celery beat
and the other celery -A base.runcelery:celery worker
.
Then, run through your task that needed the flask context. Should work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With