I'm not trying to use celery.current_app
inside the view function, but I have a function hooked to the after_task_publish
signal which is using it to update the state after the task is published, it's working outside the Flask view function and updating the state correctly, but when I send the task from inside the view function the task state won't be updated, I checked and the problem is that the current_app.backend
is an instance of DisabledBackend
which is the default instead of being an instance of RedisBackend
which I'm using.
This is happening because inside the Flask view function the proxy to the current Celery instance celery.current_app
is referring to the default instance which is created when there's no current Celery instance.
I tried reproducing what's happening and here's a test script:
from __future__ import absolute_import, print_function, unicode_literals
from flask import Flask, request
from celery import Celery, current_app
from celery.signals import after_task_publish
# internal module for debugging purposes
from celery._state import default_app, _tls
# the flask application
flask_app = Flask(__name__)
# the celery application
celery_app = Celery('tasks', broker='amqp://', backend='redis://')
# debugging info
debug = """
[{location}]
celery_app = {celery_app}
current_app = {current_app}
add.app = {add_app}
default_app = {default_app}
_tls.current_app = {tls_current_app}
"""
print(debug.format(
location = 'OUTSIDE VIEW',
celery_app = celery_app,
current_app = current_app,
add_app = add.app,
default_app = default_app,
tls_current_app = _tls.current_app
))
# fired after a task is published
@after_task_publish.connect
def after_publish(sender=None, body=None, **kwargs):
print(debug.format(
location = 'INSIDE SIGNAL FUNCTION',
celery_app = celery_app,
current_app = current_app,
add_app = add.app,
default_app = default_app,
tls_current_app = _tls.current_app
))
# a simple task for testing
@celery_app.task(name='add')
def add(a, b):
return a + b
@flask_app.route('/add')
def add_view():
print(debug.format(
location = 'INSIDE VIEW',
celery_app = celery_app,
current_app = current_app,
add_app = add.app,
default_app = default_app,
tls_current_app = _tls.current_app
))
a = request.args.get('a')
b = request.args.get('b')
task = add.delay(a, b)
return task.task_id
if __name__ == '__main__':
flask_app.run(debug=True)
And here's the output:
[OUTSIDE VIEW]
celery_app = <Celery tasks:0xb69ede4c>
current_app = <Celery tasks:0xb69ede4c>
add.app = <Celery tasks:0xb69ede4c>
default_app = None
_tls.current_app = <Celery tasks:0xb69ede4c>
[INSIDE VIEW]
celery_app = <Celery tasks:0xb69ede4c>
current_app = <Celery default:0xb6b0546c>
add.app = <Celery tasks:0xb69ede4c>
default_app = None
_tls.current_app = None # making current_app fallback to the default instance
[INSIDE SIGNAL FUNCTION]
celery_app = <Celery tasks:0xb69ede4c>
current_app = <Celery default:0xb6a174ec>
add.app = <Celery tasks:0xb69ede4c>
default_app = None
_tls.current_app = None
Because _tls.current_app
is None inside the view, this is why the celery.current_app
is refering to the default instance, from celery._state._get_current_app
:
return _tls.current_app or default_app
_tls
is an instance of celery._state._TLS
:
class _TLS(threading.local):
#: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
#: sets this, so it will always contain the last instantiated app,
#: and is the default app returned by :func:`app_or_default`.
current_app = None
Is the problem related to threading ? could this be a bug ? or is this an expected behavior ?
Note that I can use the actual celery instance in my hooked function and everything will work just fine, but I'm concerned about celery.current_app
that is used somewhere else that will break my code.
I found the problem when I ran the Flask app without debugging enabled and it worked without any problems, when debug
is True
the reloader is used which runs the app in another thread, this happens in werkzeug._reloader.run_with_reloader
function.
And depending on the Python docs about the class threading.local
which is subclassed to store the current app instance:
A class that represents thread-local data. Thread-local data are data whose values are thread specific.
The instance’s values will be different for separate threads.
So this means that celery._state._tls.current_app
is not shared between threads, and we have to set the celery instance manually as the current app, for example in the view function:
celery_app.set_current()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With