Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why `celery.current_app` refers the default instance inside Flask view functions

I'm not trying to use celery.current_app inside the view function, but I have a function hooked to the after_task_publish signal which is using it to update the state after the task is published, it's working outside the Flask view function and updating the state correctly, but when I send the task from inside the view function the task state won't be updated, I checked and the problem is that the current_app.backend is an instance of DisabledBackend which is the default instead of being an instance of RedisBackend which I'm using.

This is happening because inside the Flask view function the proxy to the current Celery instance celery.current_app is referring to the default instance which is created when there's no current Celery instance.

I tried reproducing what's happening and here's a test script:

from __future__ import absolute_import, print_function, unicode_literals

from flask import Flask, request

from celery import Celery, current_app
from celery.signals import after_task_publish
# internal module for debugging purposes
from celery._state import default_app, _tls


# the flask application
flask_app = Flask(__name__)

# the celery application
celery_app = Celery('tasks', broker='amqp://', backend='redis://')

# debugging info
debug = """
[{location}]
celery_app       = {celery_app}
current_app      = {current_app}
add.app          = {add_app}
default_app      = {default_app}
_tls.current_app = {tls_current_app}
"""

print(debug.format(
    location = 'OUTSIDE VIEW',
    celery_app = celery_app,
    current_app = current_app,
    add_app = add.app,
    default_app = default_app,
    tls_current_app = _tls.current_app
))


# fired after a task is published
@after_task_publish.connect
def after_publish(sender=None, body=None, **kwargs):
    print(debug.format(
        location = 'INSIDE SIGNAL FUNCTION',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

# a simple task for testing
@celery_app.task(name='add')
def add(a, b):
    return a + b


@flask_app.route('/add')
def add_view():
    print(debug.format(
        location = 'INSIDE VIEW',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

    a = request.args.get('a')
    b = request.args.get('b')

    task = add.delay(a, b)

    return task.task_id


if __name__ == '__main__':
    flask_app.run(debug=True)

And here's the output:

[OUTSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery tasks:0xb69ede4c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = <Celery tasks:0xb69ede4c>


[INSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6b0546c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None   # making current_app fallback to the default instance


[INSIDE SIGNAL FUNCTION]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6a174ec>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None

Because _tls.current_app is None inside the view, this is why the celery.current_app is refering to the default instance, from celery._state._get_current_app:

return _tls.current_app or default_app

_tls is an instance of celery._state._TLS:

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None

Is the problem related to threading ? could this be a bug ? or is this an expected behavior ?

Note that I can use the actual celery instance in my hooked function and everything will work just fine, but I'm concerned about celery.current_app that is used somewhere else that will break my code.

like image 269
Pierre Avatar asked Oct 23 '14 11:10

Pierre


1 Answers

I found the problem when I ran the Flask app without debugging enabled and it worked without any problems, when debug is True the reloader is used which runs the app in another thread, this happens in werkzeug._reloader.run_with_reloader function.

And depending on the Python docs about the class threading.local which is subclassed to store the current app instance:

A class that represents thread-local data. Thread-local data are data whose values are thread specific.

The instance’s values will be different for separate threads.

So this means that celery._state._tls.current_app is not shared between threads, and we have to set the celery instance manually as the current app, for example in the view function:

celery_app.set_current()
like image 65
Pierre Avatar answered Oct 17 '22 10:10

Pierre