Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sharing a postgres connection pool between python multiproccess

I am trying to use psycopg2's connection pool with python's multiprocess library.

Currently, attempting to share the connection pool amongst threads in the manner described above causes:

psycopg2.OperationalError: SSL error: decryption failed or bad record mac

The following code should reproduce the error, which the caveat that the reader has to set up a simple postgres database.


from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras

connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres',
    user='postgres', password='postgres', host='localhost')

class ConnectionFromPool:
    """
    Class to establish a connection with the local PostgreSQL database
    To use:
        query = SELECT * FROM ticker_metadata
        with ConnectionFromPool() as cursor:
            cursor.execute(query)
            results = cursor.fetchall()
    Returns:
        Arrayed Dictionary of results
        [{...},{...},{...}]
    """
    def __init__(self):
        self.connection_pool = None
        self.cursor = None
        self.connection = None
    def __enter__(self):
        self.connection = connection_pool.getconn()
        self.cursor = self.connection.cursor(
            cursor_factory=psycopg2.extras.RealDictCursor)
        return self.cursor
    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_val is not None:
            self.connection.rollback()
        else:
            self.cursor.close()
            self.connection.commit()
        connection_pool.putconn(self.connection)


def test_query(col_attribute):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
    with ConnectionFromPool() as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
    return result


def multiprocessing(func, args, n_workers=2):
    """spawns multiple processes

    Args:
        func: function, to be performed
        args: list of args to be passed to each call of func
        n_workers: number of processes to be spawned

    Return:
        A list, containing the results of each proccess
    """
    with Pool(processes=n_workers) as executor:
        res = executor.starmap(func, args)

    return list(res)


def main():
    args = [[i] for i in range(1000)]
    results = multiprocessing(test_query, args, 2)


if __name__ == "__main__":
    main()

What I have already tried:

  1. Having each process open and close its own connection to the database, instead of attempting to use a connection pool. This is slow.
  2. Having each process use its own connection pool, this is also slow.
  3. Passing a connection a psycopg2 connection object to each process, instead of having this implicitly called with the with statement in the sql query. This throws an error claiming that the connection object is not pickle-able.

Note: If I put a sleep operation in all but one of the processes, the non-sleeping processes runs fine and executes its query, until the remaining threads un-sleep, then I get the above error.

What I have already read:

  1. Share connection to postgres db across processes in Python
  2. Python: decryption failed or bad record mac when calling from Thread
  3. Connection problems with SQLAlchemy and multiple processes

Finally:

How can I use a connection pool (psycopg2) with python's multiprocess (multiprocessing). I am open to using other libraries so long as they work with python and postgresql databases.

like image 345
lostlostlostlostlost Avatar asked Jul 26 '19 06:07

lostlostlostlostlost


People also ask

Can Postgres handle multiple connections?

No, you can only have a single statement executing at the same time on a PostgreSQL connections.

How do I connect to a Python script using PostgreSQL?

Establishing connection using python You can create new connections using the connect() function. This accepts the basic connection parameters such as dbname, user, password, host, port and returns a connection object. Using this function, you can establish a connection with the PostgreSQL.

How does Postgres connection pooling work?

For an environment without an application server, PostgreSQL provides two implementations of DataSource which an application can use directly. One implementation performs connection pooling, while the other simply provides access to database connections through the DataSource interface without any pooling.

How does Python connect to PostgreSQL database?

First, read database connection parameters from the database. ini file. Next, create a new database connection by calling the connect() function. Then, create a new cursor and execute an SQL statement to get the PostgreSQL database version.


1 Answers

Here is my solution. The solution can be stated in 2 parts:

  1. Have a wrapper function that will be executed by each unique Process. The main purpose of this wrapper function is to create its own connection pool
  2. For each query executed by the wrapper function in Step 1, pass the connection pool to the query function (in the example above, this is test_query)

In more detail with reference to the example in the question:

Step 1

Create the wrapper function that will be re-using one connection pool per Process:

def multi_query(list_of_cols):
    # create a new connection pool per Process
    new_pool = new_connection_pool()

    # Pass the pool to each query
    for col in list_of_cols:
        test_query(col, new_pool)

Step 2

Modify the query function to accept a connection pool:

Old test_query:

def test_query(col_attribute):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
   with ConnectionFromPool() as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
   return result

New test_query:

def test_query(col_attribute, connection_pool=None):
    """
    Simple SQL query
    """
    query = f"""SELECT *
                FROM col
                WHERE col = {col_attribute}
                ;"""
    with ConnectionFromPool(connection_pool) as cursor:
        cursor.execute(query)
        result = cursor.fetchall()
    return result
like image 192
lostlostlostlostlost Avatar answered Oct 20 '22 13:10

lostlostlostlostlost