I'm trying to set up a system that elegantly defers database operations to a seperate thread in order to avoid blocking during Twisted callbacks.
So far, here is my approach:
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from twisted.internet.threads import deferToThread
_engine = create_engine(initialization_string)
Session = scoped_session(sessionmaker(bind=_engine))
@contextmanager
def transaction_context():
session = Session()
try:
yield session
session.commit()
except:
# No need to do session.rollback(). session.remove will do it.
raise
finally:
session.remove()
def threaded(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
return deferToThread(fn, *args, **kwargs)
return wrapper
This should allow me to wrap a function with the threaded
decorator and then use the transaction_context
context manager in said function's body. Below is an example:
from __future__ import print_function
from my_lib.orm import User, transaction_context, threaded
from twisted.internet import reactor
@threaded
def get_n_users(n):
with transaction_context() as session:
return session.query(User).limit(n).all()
if __name__ == '__main__':
get_n_users(n).addBoth(len)
reactor.run()
However, when I run the above script, I get a failure containing the following traceback:
Unhandled error in Deferred:
Unhandled Error
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap
self.__bootstrap_inner()
File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
--- <exception caught here> ---
File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker
result = context.call(ctx, function, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "testaccess.py", line 9, in get_n_users
return session.query(User).limit(n).all()
File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context
session.remove()
exceptions.AttributeError: 'Session' object has no attribute 'remove'
I was not expecting this at all. What am I missing? Did I not instantiate my scoped_session
properly?
Edit: Here is a related question about integrating this setup with Twisted. It might help clarify what I'm trying to achieve.
Short answer
Call .remove()
on Session
, not session
.
Long answer:
scoped_session
doesn't really return a Session
class. Instead, it creates an object that pays attention to which thread it's called in. Calling it will either return the existing Session
instance associated with that thread or associate a new one and return that. A thread local is what associates a thread with a session.
The remove
method on a scoped_session
object removes the session object currently associated with the thread in which it's called. That means it's the opposite of scoped_session.__call__
, which is kind of a confusing API.
Here's a short Python script to illustrate the behavior.
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
_engine = create_engine('sqlite:///:memory:')
Session = scoped_session(sessionmaker(_engine))
def scoped_session_demo(remove=False):
ids = []
def push_ids():
thread_name = threading.currentThread().getName()
data = [thread_name]
data.append(Session())
if remove:
Session.remove()
data.append(Session())
ids.append(data)
t = threading.Thread(target=push_ids)
t.start()
t.join()
push_ids()
sub_thread, main_thread = ids
sub_name, sub_session_a, sub_session_b = sub_thread
main_name, main_session_a, main_session_b = main_thread
print sub_name, sub_session_a == sub_session_b
print main_name, main_session_a == main_session_b
print sub_name, '==', main_name, sub_session_a == main_session_b
print 'Without remove:'
scoped_session_demo()
print 'With remove:'
scoped_session_demo(True)
Its output:
Without remove:
Thread-1 True
MainThread True
Thread-1 == MainThread False
With remove:
Thread-2 False
MainThread False
Thread-2 == MainThread False
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With