Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to setup sqlalchemy session in celery tasks with no global variable

Summary: I want to use a sqlalchemy session in celery tasks without having a global variable containing that session.

I am using SQLAlchemy in a project with celery tasks, and I'm having

Currently, I have a global variable 'session' defined along with my celery app setup (celery.py), with a worker signal to set it up.

session = scoped_session(sessionmaker())

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    # load the application configuration
    # db_uri = conf['db_uri']
    engine = create_engine(db_uri)
    session.configure(bind=engine)

In the module defining the tasks, I simply import 'session' and use it. Tasks are defined with a custom class that closes the session after returning:

class DBTask(Task):
    def after_return(self, *args, **kwargs):
        session.remove()

That works well, however: when unit testing with CELERY_ALWAYS_EAGER=True, the session won't be configured. The only solution I've found so far is to mock that 'session' variable when running a task in a unit test:

with mock.patch('celerymodule.tasks.session', self.session):
    do_something.delay(...)

While it works, I don't want to do that.

Is there any way to setup a session that will no be a global variable, that will work both for normal asynchronous behavior and without workers with CELERY_ALWAYS_EAGER=True?

like image 434
Clément Schreiner Avatar asked Aug 13 '15 22:08

Clément Schreiner


1 Answers

The answer was right under my nose in the official documentation about custom task classes.

I modified the custom task class that I use for tasks accessing the database:

class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = _get_engine_session(self.conf['db_uri'],
                                                   verbose=False)

        return self._session

I define my tasks this way:

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()

That way, the SQLAlchemy session will only be created once for each celery worker process, and I don't need any global variable.

This solves the problem with my unit tests, since the SQLAlchemy session setup is now independant from the celery workers.

like image 83
Clément Schreiner Avatar answered Nov 19 '22 01:11

Clément Schreiner