Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Django ORM leaks connections when using ThreadPoolExecutor

I'm using ThreadPoolExecutor to speed up data processing. The problem is that the thread pool creates new database connections and Django doesn't close them. I do have CONN_MAX_AGE in settings.py and I already tried to call django.db.close_old_connections().

Here is a code example:

def compute(job):
    result = FooModel.objects.filter(...).aggregate(...)
    return BarModel.objects.create(result)

def process(dataset):
    thread_pool = ThreadPoolExecutor(max_workers=20)
    futures = []

    for job in dataset:
        futures += [thread_pool.submit(compute, job)]

    results = list(r.result() for r in wait(futures)[0])
    return results

for i in range(0, 100):
    process(['foo', 'bar', 'qux'])

Is Django ORM able to terminate idle DB connections if they were started in another thread?


UPD: Interestingly, Django doesn't even know about these connections:

>>> from django.db import connections
>>> print(len(connections.all()))
>>> 2

mypostgresdb=# select count(*) from pg_stat_activity;
 count 
-------
   182
(1 row)

And all workers threads were already closed for sure:

>>> #  workers threads were closed:
>>> import threading
>>> threading.enumerate()
[<_MainThread(MainThread, started 140660203321088)>]
like image 531
Max Malysh Avatar asked Feb 04 '23 17:02

Max Malysh


2 Answers

My guess is that the ThreadPoolExecutor is not what is creating the DB connection, but the threaded jobs are the ones holding the connection. I've had to deal with this already.

I ended up building this wrapper, to ensure that threads are closed manually whenever jobs are done in a ThreadPoolExecutor. This should be useful in ensuring connections are not leaked, so far I haven't seen any leaking while using this code.

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)

Then you can just use this:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))

...and be confident that the connections will close.

like image 148
Dougyfresh Avatar answered Feb 06 '23 09:02

Dougyfresh


The full document is here

The final solution is to find an opportunity to actively close the database connection. Specific to our project, every time a worker thread completes a task, it closes its related connection, because we use ThreadPoolExecutor, so Django can easily do this. a little. The code is as follows:

from django.db import connections

def on_done(future):
    connections.close_all()

def main():
     with ThreadPoolExecutor() as executor:
        while True:
            future = executor.submit(do, get_a_job())
            future.add_done_callback(on_done)
like image 21
rmaleki Avatar answered Feb 06 '23 08:02

rmaleki