Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is my scoped_session raising an AttributeError: 'Session' object has no attribute 'remove'

I'm trying to set up a system that elegantly defers database operations to a seperate thread in order to avoid blocking during Twisted callbacks.

So far, here is my approach:

from contextlib import contextmanager

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

from twisted.internet.threads import deferToThread

_engine = create_engine(initialization_string)
Session = scoped_session(sessionmaker(bind=_engine))


@contextmanager
def transaction_context():
    session = Session()
    try:
        yield session
        session.commit()
    except:
        # No need to do session.rollback().  session.remove will do it.
        raise
    finally:
        session.remove()


def threaded(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        return deferToThread(fn, *args, **kwargs)
    return wrapper

This should allow me to wrap a function with the threaded decorator and then use the transaction_context context manager in said function's body. Below is an example:

from __future__ import print_function
from my_lib.orm import User, transaction_context, threaded
from twisted.internet import reactor


@threaded
def get_n_users(n):
    with transaction_context() as session:
        return session.query(User).limit(n).all()

if __name__ == '__main__':
    get_n_users(n).addBoth(len)
    reactor.run()

However, when I run the above script, I get a failure containing the following traceback:

Unhandled error in Deferred:
Unhandled Error
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap
    self.__bootstrap_inner()
  File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
--- <exception caught here> ---
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker
    result = context.call(ctx, function, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
  File "testaccess.py", line 9, in get_n_users
    return session.query(User).limit(n).all()
  File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
    self.gen.next()
  File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context
    session.remove()
exceptions.AttributeError: 'Session' object has no attribute 'remove'

I was not expecting this at all. What am I missing? Did I not instantiate my scoped_session properly?

Edit: Here is a related question about integrating this setup with Twisted. It might help clarify what I'm trying to achieve.

like image 602
Louis Thibault Avatar asked Jan 12 '14 18:01

Louis Thibault


1 Answers

Short answer

Call .remove() on Session, not session.

Long answer:

scoped_session doesn't really return a Session class. Instead, it creates an object that pays attention to which thread it's called in. Calling it will either return the existing Session instance associated with that thread or associate a new one and return that. A thread local is what associates a thread with a session.

The remove method on a scoped_session object removes the session object currently associated with the thread in which it's called. That means it's the opposite of scoped_session.__call__, which is kind of a confusing API.

Here's a short Python script to illustrate the behavior.

import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

_engine = create_engine('sqlite:///:memory:')
Session = scoped_session(sessionmaker(_engine))


def scoped_session_demo(remove=False):
    ids = []

    def push_ids():
        thread_name = threading.currentThread().getName()
        data = [thread_name]

        data.append(Session())
        if remove:
            Session.remove()
        data.append(Session())

        ids.append(data)

    t = threading.Thread(target=push_ids)
    t.start()
    t.join()

    push_ids()

    sub_thread, main_thread = ids

    sub_name, sub_session_a, sub_session_b = sub_thread
    main_name, main_session_a, main_session_b = main_thread

    print sub_name, sub_session_a == sub_session_b
    print main_name, main_session_a == main_session_b
    print sub_name, '==', main_name, sub_session_a == main_session_b


print 'Without remove:'
scoped_session_demo()
print 'With remove:'
scoped_session_demo(True)

Its output:

Without remove:
Thread-1 True
MainThread True
Thread-1 == MainThread False
With remove:

Thread-2 False
MainThread False
Thread-2 == MainThread False
like image 170
Mark Williams Avatar answered Nov 13 '22 22:11

Mark Williams