Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connection problems with SQLAlchemy and multiple processes

Tags:

I'm using PostgreSQL and SQLAlchemy in a project that consists of a main process which launches child processes. All of these processes access the database via SQLAlchemy.

I'm experiencing repeatable connection failures: The first few child processes work correctly, but after a while a connection error is raised. Here's an MWCE:

from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, create_engine from sqlalchemy.orm import sessionmaker  DB_URL = 'postgresql://user:password@localhost/database'  Base = declarative_base()  class Dummy(Base):     __tablename__ = 'dummies'     id = Column(Integer, primary_key=True)     value = Column(Integer)  engine = None Session = None session = None  def init():     global engine, Session, session     engine = create_engine(DB_URL)     Base.metadata.create_all(engine)     Session = sessionmaker(bind=engine)     session = Session()  def cleanup():     session.close()     engine.dispose()  def target(id):     init()     try:         dummy = session.query(Dummy).get(id)         dummy.value += 1         session.add(dummy)         session.commit()     finally:         cleanup()  def main():     init()     try:         dummy = Dummy(value=1)         session.add(dummy)         session.commit()         p = multiprocessing.Process(target=target, args=(dummy.id,))         p.start()         p.join()         session.refresh(dummy)         assert dummy.value == 2     finally:         cleanup()  if __name__ == '__main__':     i = 1     while True:         print(i)         main()         i += 1 

On my system (PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) this yields

1 2 3 4 5 6 7 8 9 10 11 Traceback (most recent call last):   File "./fork_test.py", line 64, in <module>     main()   File "./fork_test.py", line 55, in main     session.refresh(dummy)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh     only_load_props=attribute_names) is None:   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident     return q.one()   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one     ret = self.one_or_none()   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none     ret = list(self)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__     return self._execute_and_instances(context)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances     result = conn.execute(querycontext.statement, self._params)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute     return meth(self, multiparams, params)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection     return connection._execute_clauseelement(self, multiparams, params)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement     compiled_sql, distilled_params   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context     context)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception     exc_info   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause     reraise(type(exception), exception, tb=exc_tb, cause=cause)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context     context)   File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute     cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly     This probably means the server terminated abnormally     before or while processing the request.  [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}] 

This is repeatable and always crashes at the same iteration.

I'm creating a new engine and session after the fork as recommended by the SQLAlchemy documentation and elsewhere. Interestingly, the following slightly different approach does not crash:

import contextlib import multiprocessing  import sqlalchemy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, create_engine from sqlalchemy.orm import sessionmaker  DB_URL = 'postgresql://user:password@localhost/database'  Base = declarative_base()  class Dummy(Base):     __tablename__ = 'dummies'     id = Column(Integer, primary_key=True)     value = Column(Integer)  @contextlib.contextmanager def get_session():     engine = sqlalchemy.create_engine(DB_URL)     Base.metadata.create_all(engine)     Session = sessionmaker(bind=engine)     session = Session()     try:         yield session     finally:         session.close()         engine.dispose()  def target(id):     with get_session() as session:         dummy = session.query(Dummy).get(id)         dummy.value += 1         session.add(dummy)         session.commit()  def main():     with get_session() as session:         dummy = Dummy(value=1)         session.add(dummy)         session.commit()         p = multiprocessing.Process(target=target, args=(dummy.id,))         p.start()         p.join()         session.refresh(dummy)         assert dummy.value == 2  if __name__ == '__main__':     i = 1     while True:         print(i)         main()         i += 1 

Since the original code is more complex and cannot simply be switched over to the latter version I'd like to understand why one of these works and the other doesn't.

The only obvious difference is that the crashing code uses global variables for the engine and the session -- these are shared via copy-on-write with the child processes. However, since I reset them directly after the fork I don't understand how that could be a problem.

Update

I re-ran the two code pieces with the latest SQLAlchemy (1.1.5) using both Python 2.7 and Python 3.4. On both the results are basically as described above. However, on Python 2.7 the crash of the first code piece now happens in the 13th iteration (reproducibly) while on 3.4 it already happens in the third iteration (also reproducibly). The second code piece runs without problems on both versions. Here's the traceback from 3.4:

1 2 3 Traceback (most recent call last):   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context     context)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute     cursor.execute(statement, parameters) psycopg2.OperationalError: server closed the connection unexpectedly     This probably means the server terminated abnormally     before or while processing the request.   The above exception was the direct cause of the following exception:  Traceback (most recent call last):   File "fork_test.py", line 64, in <module>     main()   File "fork_test.py", line 55, in main     session.refresh(dummy)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh     only_load_props=attribute_names) is None:   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident     return q.one()   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one     ret = self.one_or_none()   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none     ret = list(self)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__     return self._execute_and_instances(context)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances     result = conn.execute(querycontext.statement, self._params)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute     return meth(self, multiparams, params)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection     return connection._execute_clauseelement(self, multiparams, params)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement     compiled_sql, distilled_params   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context     context)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception     exc_info   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause     reraise(type(exception), exception, tb=exc_tb, cause=cause)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise     raise value.with_traceback(tb)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context     context)   File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute     cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly     This probably means the server terminated abnormally     before or while processing the request.  [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}] 

Here's the PostgreSQL log (it's the same for 2.7 and 3.4):

2017-01-18 10:59:36 UTC [22429-1] LOG:  database system was shut down at 2017-01-18 10:59:35 UTC 2017-01-18 10:59:36 UTC [22429-2] LOG:  MultiXact member wraparound protections are now enabled 2017-01-18 10:59:36 UTC [22428-1] LOG:  database system is ready to accept connections 2017-01-18 10:59:36 UTC [22433-1] LOG:  autovacuum launcher started 2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG:  incomplete startup packet 2017-01-18 11:00:10 UTC [22466-1] user@db LOG:  SSL error: decryption failed or bad record mac 2017-01-18 11:00:10 UTC [22466-2] user@db LOG:  could not receive data from client: Connection reset by peer 

(Note that the message about the incomplete startup packet is harmless)

like image 944
Florian Brucker Avatar asked Dec 22 '16 08:12

Florian Brucker


People also ask

Does SQLAlchemy close connection automatically?

close() method is automatically invoked at the end of the block. The Connection , is a proxy object for an actual DBAPI connection. The DBAPI connection is retrieved from the connection pool at the point at which Connection is created.

What is connection pooling SQLAlchemy?

A connection pool is a standard technique used to maintain long running connections in memory for efficient re-use, as well as to provide management for the total number of connections an application might use simultaneously.

Does SQLAlchemy support batching?

SQLAlchemy supports the widest variety of database and architectural designs as is reasonably possible. Unit Of Work. The Unit Of Work system, a central part of SQLAlchemy's Object Relational Mapper (ORM), organizes pending insert/update/delete operations into queues and flushes them all in one batch.

Is SQLAlchemy blocking?

Short answer, no. It's the database libraries that are blocking, not SQLalchemy itself. Nothing stops you from doing DB stuff in a separate thread, though.


1 Answers

Quoting "How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?" with added emphasis:

The SQLAlchemy Engine object refers to a connection pool of existing database connections. So when this object is replicated to a child process, the goal is to ensure that no database connections are carried over.

and

However, for the case of a transaction-active Session or Connection being shared, there’s no automatic fix for this; an application needs to ensure a new child process only initiate new Connection objects and transactions, as well as ORM Session objects.

The issue stems from the forked child process inheriting the live global session, which is holding on to a Connection. When target calls init, it overwrites the global references to engine and session, thus decreasing their refcounts to 0 in the child, forcing them to finalize. If you for example one way or another create another reference to the inherited session in the child, you prevent it from being cleaned up – but don't do that. After main has joined and returns to business as usual it is trying to use the now potentially finalized – or otherwise out of sync – connection. As to why this causes an error only after some amount of iterations I'm not sure.

The only way to handle this situation using globals the way you do is to

  1. Close all sessions
  2. Call engine.dispose()

before forking. This will prevent connections from leaking to the child. For example:

def main():     global session     init()     try:         dummy = Dummy(value=1)         session.add(dummy)         session.commit()         dummy_id = dummy.id         # Return the Connection to the pool         session.close()         # Dispose of it!         engine.dispose()         # ...or call your cleanup() function, which does the same         p = multiprocessing.Process(target=target, args=(dummy_id,))         p.start()         p.join()         # Start a new session         session = Session()         dummy = session.query(Dummy).get(dummy_id)         assert dummy.value == 2     finally:         cleanup() 

Your second example does not trigger finalization in the child, and so it only seems to work, though it might be as broken as the first, as it is still inheriting a copy of the session and its connection defined locally in main.

like image 162
Ilja Everilä Avatar answered Oct 11 '22 10:10

Ilja Everilä