Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Postgres psycopg2 ThreadedConnectionPool exhausted

I have looked into several 'too many clients' related topic here but still can't solve my problem, so I have to ask this again, for me specific case.

Basically, I set up my local Postgres server and need to do tens of thousands of queries, so I used the Python psycopg2package. Here are my codes:

import psycopg2
import pandas as pd
import numpy as np
from flashtext import KeywordProcessor
from psycopg2.pool import ThreadedConnectionPool
from concurrent.futures import ThreadPoolExecutor

df = pd.DataFrame({'S':['California', 'Ohio', 'Texas'], 'T':['Dispatcher', 'Zookeeper', 'Mechanics']})
# df = pd.concat([df]*10000) # repeat df 10000 times

DSN = "postgresql://User:password@localhost/db"
tcp = ThreadedConnectionPool(1, 800, DSN)

def do_one_query(inputS, inputT):
    conn = tcp.getconn()
    c = conn.cursor()

    q = r"SELECT * from eridata where "State" = 'California' and "Title" = 'Dispatcher' limit 1;"   

    c.execute(q)
    all_results = c.fetchall()
    for row in all_results:
        return row
    tcp.putconn(conn, close=True)

cnt=0
for idx, row in df.iterrows():

    cnt+=1
    with ThreadPoolExecutor(max_workers=1) as pool:
        ret = pool.submit(do_one_query,  row["S"], row["T"])
        print ret.result()
    print cnt

The code runs well with a small df. If I repeat df by 10000 times, I got error message saying connection pool exhausted . I though the connections I used have been closed by this line:

tcp.putconn(conn, close=True) But I guess actually they are not closed? How can I get around this issue?

like image 784
user3768495 Avatar asked Jan 30 '18 23:01

user3768495


1 Answers

I've struggled to find really detailed information on how the ThreadedConnectionPool works. https://bbengfort.github.io/observations/2017/12/06/psycopg2-transactions.html ain't bad, but it turns out that its claim that getconn blocks until a connection becomes available is incorrect. Checking the code, all ThreadedConnectionPool adds is a lock around the AbstractConnectionPool methods to prevent race conditions. If more than maxconn connections are attempted used at any point, the connection pool exhausted PoolError will be raised.

If you want something a bit simpler than the accepted answer, further wrapping the methods in a Semaphore providing the blocking until a connection becomes available should do the trick:

from psycopg2.pool import ThreadedConnectionPool
from threading import Semaphore

class ReallyThreadedConnectionPool(ThreadedConnectionPool):
    def __init__(self, minconn, maxconn, *args, **kwargs):
        self._semaphore = Semaphore(maxconn)
        super().__init__(minconn, maxconn, *args, **kwargs)

    def getconn(self, *args, **kwargs):
        self._semaphore.acquire()
        return super().getconn(*args, **kwargs)

    def putconn(self, *args, **kwargs):
        super().putconn(*args, **kwargs)
        self._semaphore.release()
like image 186
Rune Lyngsoe Avatar answered Sep 30 '22 19:09

Rune Lyngsoe