Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Flask-SQLAlchemy in a Celery task

I recently switch to Celery 3.0. Before that I was using Flask-Celery in order to integrate Celery with Flask. Although it had many issues like hiding some powerful Celery functionalities but it allowed me to use the full context of Flask app and especially Flask-SQLAlchemy.

In my background tasks I am processing data and the SQLAlchemy ORM to store the data. The maintainer of Flask-Celery has dropped support of the plugin. The plugin was pickling the Flask instance in the task so I could have full access to SQLAlchemy.

I am trying to replicate this behavior in my tasks.py file but with no success. Do you have any hints on how to achieve this?

like image 892
PanosJee Avatar asked Aug 20 '12 20:08

PanosJee


People also ask

Can I use flask-SQLAlchemy without Flask?

This does not completely answer your question, because it does not remove Flask dependency, but you can use SqlAlchemy in scripts and tests by just not running the Flask app. One difficulty you may encounter is the requirement of using db.

How do I run a Flask in SQLAlchemy?

Step 1 - Install the Flask-SQLAlchemy extension. Step 2 - You need to import the SQLAlchemy class from this module. Step 3 - Now create a Flask application object and set the URI for the database to use. Step 4 - then use the application object as a parameter to create an object of class SQLAlchemy.

Is flask-SQLAlchemy the same as SQLAlchemy?

What is Flask-SQLAlchemy? Flask-SQLAlchemy is an extension for Flask that aims to simplify using SQLAlchemy with Flask by providing defaults and helpers to accomplish common tasks. One of the most sought after helpers being the handling of a database connection across the app.


2 Answers

Update: We've since started using a better way to handle application teardown and set up on a per-task basis, based on the pattern described in the more recent flask documentation.

extensions.py

import flask from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery  class FlaskCelery(Celery):      def __init__(self, *args, **kwargs):          super(FlaskCelery, self).__init__(*args, **kwargs)         self.patch_task()          if 'app' in kwargs:             self.init_app(kwargs['app'])      def patch_task(self):         TaskBase = self.Task         _celery = self          class ContextTask(TaskBase):             abstract = True              def __call__(self, *args, **kwargs):                 if flask.has_app_context():                     return TaskBase.__call__(self, *args, **kwargs)                 else:                     with _celery.app.app_context():                         return TaskBase.__call__(self, *args, **kwargs)          self.Task = ContextTask      def init_app(self, app):         self.app = app         self.config_from_object(app.config)   celery = FlaskCelery() db = SQLAlchemy() 

app.py

from flask import Flask from extensions import celery, db  def create_app():     app = Flask()          #configure/initialize all your extensions     db.init_app(app)     celery.init_app(app)      return app 

Once you've set up your app this way, you can run and use celery without having to explicitly run it from within an application context, as all your tasks will automatically be run in an application context if necessary, and you don't have to explicitly worry about post-task teardown, which is an important issue to manage (see other responses below).

Troubleshooting

Those who keep getting with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app' make sure to:

  1. Keep the celery import at the app.py file level. Avoid:

app.py

from flask import Flask  def create_app():     app = Flask()      initiliaze_extensions(app)      return app  def initiliaze_extensions(app):     from extensions import celery, db # DOOMED! Keep celery import at the FILE level          db.init_app(app)     celery.init_app(app) 
  1. Start you celery workers BEFORE you flask run and use
celery worker -A app:celery -l info -f celery.log 

Note the app:celery, i.e. loading from app.py.

You can still import from extensions to decorate tasks, i.e. from extensions import celery.

Old answer below, still works, but not as clean a solution

I prefer to run all of celery within the application context by creating a separate file that invokes celery.start() with the application's context. This means your tasks file doesn't have to be littered with context setup and teardowns. It also lends itself well to the flask 'application factory' pattern.

extensions.py

from from flask.ext.sqlalchemy import SQLAlchemy from celery import Celery  db = SQLAlchemy() celery = Celery() 

tasks.py

from extensions import celery, db from flask.globals import current_app from celery.signals import task_postrun  @celery.task def do_some_stuff():     current_app.logger.info("I have the application context")     #you can now use the db object from extensions  @task_postrun.connect def close_session(*args, **kwargs):     # Flask SQLAlchemy will automatically create new sessions for you from      # a scoped session factory, given that we are maintaining the same app     # context, this ensures tasks have a fresh session (e.g. session errors      # won't propagate across tasks)     db.session.remove() 

app.py

from extensions import celery, db  def create_app():     app = Flask()          #configure/initialize all your extensions     db.init_app(app)     celery.config_from_object(app.config)      return app 

RunCelery.py

from app import create_app from extensions import celery  app = create_app()  if __name__ == '__main__':     with app.app_context():         celery.start() 
like image 194
Paul Gibbs Avatar answered Sep 21 '22 02:09

Paul Gibbs


I used Paul Gibbs' answer with two differences. Instead of task_postrun I used worker_process_init. And instead of .remove() I used db.session.expire_all().

I'm not 100% sure, but from what I understand the way this works is when Celery creates a worker process, all inherited/shared db sessions will be expired, and SQLAlchemy will create new sessions on demand unique to that worker process.

So far it seems to have fixed my problem. With Paul's solution, when one worker finished and removed the session, another worker using the same session was still running its query, so db.session.remove() closed the connection while it was being used, giving me a "Lost connection to MySQL server during query" exception.

Thanks Paul for steering me in the right direction!

Nevermind that didn't work. I ended up having an argument in my Flask app factory to not run db.init_app(app) if Celery was calling it. Instead the workers will call it after Celery forks them. I now see several connections in my MySQL processlist.

from extensions import db from celery.signals import worker_process_init from flask import current_app  @worker_process_init.connect def celery_worker_init_db(**_):     db.init_app(current_app) 
like image 26
Robpol86 Avatar answered Sep 19 '22 02:09

Robpol86