I am trying to use psycopg2's connection pool with python's multiprocess library.
Currently, attempting to share the connection pool amongst threads in the manner described above causes:
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
The following code should reproduce the error, which the caveat that the reader has to set up a simple postgres database.
from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras
connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres',
user='postgres', password='postgres', host='localhost')
class ConnectionFromPool:
"""
Class to establish a connection with the local PostgreSQL database
To use:
query = SELECT * FROM ticker_metadata
with ConnectionFromPool() as cursor:
cursor.execute(query)
results = cursor.fetchall()
Returns:
Arrayed Dictionary of results
[{...},{...},{...}]
"""
def __init__(self):
self.connection_pool = None
self.cursor = None
self.connection = None
def __enter__(self):
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.connection.rollback()
else:
self.cursor.close()
self.connection.commit()
connection_pool.putconn(self.connection)
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
def multiprocessing(func, args, n_workers=2):
"""spawns multiple processes
Args:
func: function, to be performed
args: list of args to be passed to each call of func
n_workers: number of processes to be spawned
Return:
A list, containing the results of each proccess
"""
with Pool(processes=n_workers) as executor:
res = executor.starmap(func, args)
return list(res)
def main():
args = [[i] for i in range(1000)]
results = multiprocessing(test_query, args, 2)
if __name__ == "__main__":
main()
What I have already tried:
with
statement in the sql query. This throws an error claiming that the connection object is not pickle-able.Note: If I put a sleep
operation in all but one of the processes, the non-sleeping processes runs fine and executes its query, until the remaining threads un-sleep, then I get the above error.
What I have already read:
Finally:
How can I use a connection pool (psycopg2) with python's multiprocess (multiprocessing). I am open to using other libraries so long as they work with python and postgresql databases.
No, you can only have a single statement executing at the same time on a PostgreSQL connections.
Establishing connection using python You can create new connections using the connect() function. This accepts the basic connection parameters such as dbname, user, password, host, port and returns a connection object. Using this function, you can establish a connection with the PostgreSQL.
For an environment without an application server, PostgreSQL provides two implementations of DataSource which an application can use directly. One implementation performs connection pooling, while the other simply provides access to database connections through the DataSource interface without any pooling.
First, read database connection parameters from the database. ini file. Next, create a new database connection by calling the connect() function. Then, create a new cursor and execute an SQL statement to get the PostgreSQL database version.
Here is my solution. The solution can be stated in 2 parts:
test_query
)In more detail with reference to the example in the question:
Create the wrapper function that will be re-using one connection pool per Process:
def multi_query(list_of_cols):
# create a new connection pool per Process
new_pool = new_connection_pool()
# Pass the pool to each query
for col in list_of_cols:
test_query(col, new_pool)
Modify the query function to accept a connection pool:
Old test_query
:
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
New test_query
:
def test_query(col_attribute, connection_pool=None):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool(connection_pool) as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With