Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQLAlchemy proper session handling in multi-thread applications

Tags:

I have trouble understanding how to properly open and close database sessions efficiently, as I understood by the sqlalchemy documentation, if I use scoped_session to construct my Session object, and then use the returned Session object to create sessions, it's threadsafe, so basically every thread will get it's own session, and there won't be problems with it. Now the below example works, I put it in an infinite loop to see if it properly closes the sessions and if I monitored it correctly (in mysql by executing "SHOW PROCESSLIST;"), the connections just keep growing, it does not close them, even though I used session.close(), and even remove the scoped_session object at the end of each run. What am I doing wrong? My goal in a larger application is to use the minimum number of database connections required, because my current working implementation creates a new session in every method where it is required and closes it at before returning, which seems inefficient.

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from threading import Thread from Queue import Queue, Empty as QueueEmpty from models import MyModel   DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'   class MTWorker(object):      def __init__(self, worker_count=5):         self.task_queue = Queue()         self.worker_count = worker_count         self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)         self.DBSession = scoped_session(             sessionmaker(                 autoflush=True,                 autocommit=False,                 bind=self.db_engine             )         )      def _worker(self):         db_session = self.DBSession()         while True:             try:                 task_id = self.task_queue.get(False)                 try:                     item = db_session.query(MyModel).filter(MyModel.id == task_id).one()                     # do something with item                 except Exception as exc:                     # if an error occurrs we skip it                     continue                  finally:                     db_session.commit()                     self.task_queue.task_done()             except QueueEmpty:                 db_session.close()                 return      def start(self):         try:             db_session = self.DBSession()             all_items = db_session.query(MyModel).all()             for item in all_items:                 self.task_queue.put(item.id)              for _i in range(self.worker_count):                 t = Thread(target=self._worker)                 t.start()              self.task_queue.join()         finally:             db_session.close()             self.DBSession.remove()   if __name__ == '__main__':     while True:         mt_worker = MTWorker(worker_count=50)         mt_worker.start() 
like image 857
andrean Avatar asked Mar 08 '12 15:03

andrean


1 Answers

You should only be calling create_engine and scoped_session once per process (per database). Each will get its own pool of connections or sessions (respectively), so you want to make sure you're only creating one pool. Just make it a module level global. if you need to manage your sessions more preciesly than that, you probably shouldn't be using scoped_session

Another change to make is to use DBSession directly as though it were a session. calling session methods on the scoped_session will transparently create a thread-local session, if needed, and forward the method call to the session.

Another thing to be aware of is the pool_size of the connection pool, which is 5 by default. For many applications that's fine, but if you are creating lots of threads, you might need to tune that parameter

DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname' db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) DBSession = scoped_session(     sessionmaker(         autoflush=True,         autocommit=False,         bind=db_engine     ) )   class MTWorker(object):      def __init__(self, worker_count=5):         self.task_queue = Queue()         self.worker_count = worker_count # snip 
like image 167
SingleNegationElimination Avatar answered Sep 19 '22 00:09

SingleNegationElimination