I'm using PostgreSQL and SQLAlchemy in a project that consists of a main process which launches child processes. All of these processes access the database via SQLAlchemy.
I'm experiencing repeatable connection failures: The first few child processes work correctly, but after a while a connection error is raised. Here's an MWCE:
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, create_engine from sqlalchemy.orm import sessionmaker DB_URL = 'postgresql://user:password@localhost/database' Base = declarative_base() class Dummy(Base): __tablename__ = 'dummies' id = Column(Integer, primary_key=True) value = Column(Integer) engine = None Session = None session = None def init(): global engine, Session, session engine = create_engine(DB_URL) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() def cleanup(): session.close() engine.dispose() def target(id): init() try: dummy = session.query(Dummy).get(id) dummy.value += 1 session.add(dummy) session.commit() finally: cleanup() def main(): init() try: dummy = Dummy(value=1) session.add(dummy) session.commit() p = multiprocessing.Process(target=target, args=(dummy.id,)) p.start() p.join() session.refresh(dummy) assert dummy.value == 2 finally: cleanup() if __name__ == '__main__': i = 1 while True: print(i) main() i += 1
On my system (PostgreSQL 9.6, SQLAlchemy 1.1.4, psycopg2 2.6.2, Python 2.7, Ubuntu 14.04) this yields
1 2 3 4 5 6 7 8 9 10 11 Traceback (most recent call last): File "./fork_test.py", line 64, in <module> main() File "./fork_test.py", line 55, in main session.refresh(dummy) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1422, in refresh only_load_props=attribute_names) is None: File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident return q.one() File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2756, in one ret = self.one_or_none() File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2726, in one_or_none ret = list(self) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2797, in __iter__ return self._execute_and_instances(context) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute return meth(self, multiparams, params) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement compiled_sql, distilled_params File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context context) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception exc_info File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context context) File "/home/vagrant/latest-sqlalchemy/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 469, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 11074}]
This is repeatable and always crashes at the same iteration.
I'm creating a new engine and session after the fork as recommended by the SQLAlchemy documentation and elsewhere. Interestingly, the following slightly different approach does not crash:
import contextlib import multiprocessing import sqlalchemy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, create_engine from sqlalchemy.orm import sessionmaker DB_URL = 'postgresql://user:password@localhost/database' Base = declarative_base() class Dummy(Base): __tablename__ = 'dummies' id = Column(Integer, primary_key=True) value = Column(Integer) @contextlib.contextmanager def get_session(): engine = sqlalchemy.create_engine(DB_URL) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) session = Session() try: yield session finally: session.close() engine.dispose() def target(id): with get_session() as session: dummy = session.query(Dummy).get(id) dummy.value += 1 session.add(dummy) session.commit() def main(): with get_session() as session: dummy = Dummy(value=1) session.add(dummy) session.commit() p = multiprocessing.Process(target=target, args=(dummy.id,)) p.start() p.join() session.refresh(dummy) assert dummy.value == 2 if __name__ == '__main__': i = 1 while True: print(i) main() i += 1
Since the original code is more complex and cannot simply be switched over to the latter version I'd like to understand why one of these works and the other doesn't.
The only obvious difference is that the crashing code uses global variables for the engine and the session -- these are shared via copy-on-write with the child processes. However, since I reset them directly after the fork I don't understand how that could be a problem.
I re-ran the two code pieces with the latest SQLAlchemy (1.1.5) using both Python 2.7 and Python 3.4. On both the results are basically as described above. However, on Python 2.7 the crash of the first code piece now happens in the 13th iteration (reproducibly) while on 3.4 it already happens in the third iteration (also reproducibly). The second code piece runs without problems on both versions. Here's the traceback from 3.4:
1 2 3 Traceback (most recent call last): File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context context) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute cursor.execute(statement, parameters) psycopg2.OperationalError: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. The above exception was the direct cause of the following exception: Traceback (most recent call last): File "fork_test.py", line 64, in <module> main() File "fork_test.py", line 55, in main session.refresh(dummy) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/session.py", line 1424, in refresh only_load_props=attribute_names) is None: File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/loading.py", line 223, in load_on_ident return q.one() File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2749, in one ret = self.one_or_none() File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2719, in one_or_none ret = list(self) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2790, in __iter__ return self._execute_and_instances(context) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/orm/query.py", line 2813, in _execute_and_instances result = conn.execute(querycontext.statement, self._params) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 945, in execute return meth(self, multiparams, params) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement compiled_sql, distilled_params File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context context) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception exc_info File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/util/compat.py", line 186, in reraise raise value.with_traceback(tb) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context context) File "/home/vagrant/latest-sqlalchemy-3.4/lib/python3.4/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. [SQL: 'SELECT dummies.id AS dummies_id, dummies.value AS dummies_value \nFROM dummies \nWHERE dummies.id = %(param_1)s'] [parameters: {'param_1': 3397}]
Here's the PostgreSQL log (it's the same for 2.7 and 3.4):
2017-01-18 10:59:36 UTC [22429-1] LOG: database system was shut down at 2017-01-18 10:59:35 UTC 2017-01-18 10:59:36 UTC [22429-2] LOG: MultiXact member wraparound protections are now enabled 2017-01-18 10:59:36 UTC [22428-1] LOG: database system is ready to accept connections 2017-01-18 10:59:36 UTC [22433-1] LOG: autovacuum launcher started 2017-01-18 10:59:36 UTC [22435-1] [unknown]@[unknown] LOG: incomplete startup packet 2017-01-18 11:00:10 UTC [22466-1] user@db LOG: SSL error: decryption failed or bad record mac 2017-01-18 11:00:10 UTC [22466-2] user@db LOG: could not receive data from client: Connection reset by peer
(Note that the message about the incomplete startup packet is harmless)
close() method is automatically invoked at the end of the block. The Connection , is a proxy object for an actual DBAPI connection. The DBAPI connection is retrieved from the connection pool at the point at which Connection is created.
A connection pool is a standard technique used to maintain long running connections in memory for efficient re-use, as well as to provide management for the total number of connections an application might use simultaneously.
SQLAlchemy supports the widest variety of database and architectural designs as is reasonably possible. Unit Of Work. The Unit Of Work system, a central part of SQLAlchemy's Object Relational Mapper (ORM), organizes pending insert/update/delete operations into queues and flushes them all in one batch.
Short answer, no. It's the database libraries that are blocking, not SQLalchemy itself. Nothing stops you from doing DB stuff in a separate thread, though.
Quoting "How do I use engines / connections / sessions with Python multiprocessing, or os.fork()?" with added emphasis:
The SQLAlchemy Engine object refers to a connection pool of existing database connections. So when this object is replicated to a child process, the goal is to ensure that no database connections are carried over.
and
However, for the case of a transaction-active Session or Connection being shared, there’s no automatic fix for this; an application needs to ensure a new child process only initiate new Connection objects and transactions, as well as ORM Session objects.
The issue stems from the forked child process inheriting the live global session
, which is holding on to a Connection
. When target
calls init
, it overwrites the global references to engine
and session
, thus decreasing their refcounts to 0 in the child, forcing them to finalize. If you for example one way or another create another reference to the inherited session in the child, you prevent it from being cleaned up – but don't do that. After main
has joined and returns to business as usual it is trying to use the now potentially finalized – or otherwise out of sync – connection. As to why this causes an error only after some amount of iterations I'm not sure.
The only way to handle this situation using globals the way you do is to
engine.dispose()
before forking. This will prevent connections from leaking to the child. For example:
def main(): global session init() try: dummy = Dummy(value=1) session.add(dummy) session.commit() dummy_id = dummy.id # Return the Connection to the pool session.close() # Dispose of it! engine.dispose() # ...or call your cleanup() function, which does the same p = multiprocessing.Process(target=target, args=(dummy_id,)) p.start() p.join() # Start a new session session = Session() dummy = session.query(Dummy).get(dummy_id) assert dummy.value == 2 finally: cleanup()
Your second example does not trigger finalization in the child, and so it only seems to work, though it might be as broken as the first, as it is still inheriting a copy of the session and its connection defined locally in main
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With