I have a Flask application, which is registered as follows:
APP = Flask(__name__)
APP.config.from_object('config')
I have defined a view for a URL, in which a function is called, which interacts with the DB.
from tasks import some_func
.
.
.
some_func.delay(params)
In the tasks.py file, I am creating a Celery instance as follows:
# Config values don't work here
celery = Celery('tasks', broker='amqp://', backend='amqp://')
.
.
.
@celery.task()
def some_func():
#DB interactions
Now I get an error that says:
RuntimeError: Working outside of application context.
I read about application contexts and how they can be used. I have imported current_app
in my tasks.py
file and tried using the context as follows:
@celery.task()
def some_func():
with current_app.app_context():
#DB interactions
However I still get the same error. I also tried pushing the context from the main file as follows:
ctx = APP.app_context()
ctx.push()
But no luck yet.
How do I make Celery work with Flask?
Note: I have already tried their example here.
Context:
The same error happened to me while trying to send email(s) through Celery
tasks.
In my case, I was sending emails that was rendered by Flask
with html templates.
Reason:
Celery
and Flask
are disconnected. Thus, Celery has no clue about application context of Flask. I had to manually inject application context.
Solution:
What worked for me was Flask's app_context()
function. Simply inject/wrap application context around the function where error originate from.
Example:
from app import app
from app import mail
@celery.task
def sign_up_task(**config, **context):
mail = Mail()
with app.app_context():
mail.send(**config, **context)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With