Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python threads are not being garbage collected

Here is my threading setup. On my machine the maximum number of threads is 2047.

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

In other classes in my module, I call the ThreadPool class from above to create a new pool of threads. I then perform operations. Here is an example:

def upload_images(self):
    '''batch uploads images to s3 via multi-threading'''
    num_threads = min(500, len(pictures))
    pool = ThreadPool(num_threads)

    for p in pictures:
        pool.add_task(p.get_set_upload_img)

    pool.wait_completion()

The problem I am having is that these threads are not being garbage collected.

After a few runs, here is my error:

File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 495, in start _start_new_thread(self.__bootstrap, ()) thread.error: can't start new thread

Which means I have hit the thread limit of 2047.

Any ideas? Thanks.

like image 688
Lucas Ou-Yang Avatar asked Sep 04 '13 09:09

Lucas Ou-Yang


1 Answers

Your worker thread never returns from run, so your thread never ends.

Perhaps something like the following for your run method?

def run(self):
    while True:
        try:
            func, args, kargs = self.tasks.get()
        except Queue.Empty:
            break

        try:
            func(*args, **kargs)
        except Exception, e:
            print e

        self.tasks.task_done()
like image 51
Henry Gomersall Avatar answered Oct 16 '22 04:10

Henry Gomersall