I have trouble understanding how to properly open and close database sessions efficiently, as I understood by the sqlalchemy documentation, if I use scoped_session to construct my Session object, and then use the returned Session object to create sessions, it's threadsafe, so basically every thread will get it's own session, and there won't be problems with it. Now the below example works, I put it in an infinite loop to see if it properly closes the sessions and if I monitored it correctly (in mysql by executing "SHOW PROCESSLIST;"), the connections just keep growing, it does not close them, even though I used session.close(), and even remove the scoped_session object at the end of each run. What am I doing wrong? My goal in a larger application is to use the minimum number of database connections required, because my current working implementation creates a new session in every method where it is required and closes it at before returning, which seems inefficient.
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from threading import Thread from Queue import Queue, Empty as QueueEmpty from models import MyModel DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname' class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) self.DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=self.db_engine ) ) def _worker(self): db_session = self.DBSession() while True: try: task_id = self.task_queue.get(False) try: item = db_session.query(MyModel).filter(MyModel.id == task_id).one() # do something with item except Exception as exc: # if an error occurrs we skip it continue finally: db_session.commit() self.task_queue.task_done() except QueueEmpty: db_session.close() return def start(self): try: db_session = self.DBSession() all_items = db_session.query(MyModel).all() for item in all_items: self.task_queue.put(item.id) for _i in range(self.worker_count): t = Thread(target=self._worker) t.start() self.task_queue.join() finally: db_session.close() self.DBSession.remove() if __name__ == '__main__': while True: mt_worker = MTWorker(worker_count=50) mt_worker.start()
You should only be calling create_engine
and scoped_session
once per process (per database). Each will get its own pool of connections or sessions (respectively), so you want to make sure you're only creating one pool. Just make it a module level global. if you need to manage your sessions more preciesly than that, you probably shouldn't be using scoped_session
Another change to make is to use DBSession
directly as though it were a session. calling session methods on the scoped_session will transparently create a thread-local session, if needed, and forward the method call to the session.
Another thing to be aware of is the pool_size
of the connection pool, which is 5 by default. For many applications that's fine, but if you are creating lots of threads, you might need to tune that parameter
DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname' db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) DBSession = scoped_session( sessionmaker( autoflush=True, autocommit=False, bind=db_engine ) ) class MTWorker(object): def __init__(self, worker_count=5): self.task_queue = Queue() self.worker_count = worker_count # snip
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With