Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python thread pool that handles exceptions

Tags:

I've been looking around a good implementation of a simple python thread pool pattern and really can't find anything that suits my needs. I'm using python 2.7 and all the modules I have found either don't work, or don't handle exceptions in the workers properly. I was wondering if someone knew of a library that could offer the type of functionality I'm searching for. Help greatly appreciated.

Multiprocessing

My first attempt was with the built-in multiprocessing module, but as this doesn't use threads but subprocesses instead we run into the problem that objects cannot be pickled. No go here.

from multiprocessing import Pool  class Sample(object):     def compute_fib(self, n):         phi = (1 + 5**0.5) / 2         self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))  samples = [Sample() for i in range(8)] pool = Pool(processes=8) for s in samples: pool.apply_async(s.compute_fib, [20]) pool.join() for s in samples: print s.fib  # PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Futures

So I see there is a back port of some of the cool concurrent features of python 3.2 here. This seems perfect and simple to use. The problem is that when you get an exception in one of the workers, you only get the type of the exception such as "ZeroDivisionError" but no traceback and thus no indication of which line caused the exception. Code becomes impossible to debug. No go.

from concurrent import futures  class Sample(object):     def compute_fib(self, n):         phi = (1 + 5**0.5) / 2         1/0         self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))  samples = [Sample() for i in range(8)] pool = futures.ThreadPoolExecutor(max_workers=8) threads = [pool.submit(s.compute_fib, 20) for s in samples] futures.wait(threads, return_when=futures.FIRST_EXCEPTION) for t in threads: t.result() for s in samples: print s.fib   #    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self) #    354     def __get_result(self): #    355         if self._exception: #--> 356             raise self._exception #    357         else: #    358             return self._result # # ZeroDivisionError: integer division or modulo by zero 

Workerpool

I found an other implementation of this pattern here. This time when an exception occurs it is printed, but then my ipython interactive interpreter is left in a hanging state and needs to be killed from an other shell. No go.

import workerpool  class Sample(object):     def compute_fib(self, n):         phi = (1 + 5**0.5) / 2         1/0         self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))  samples = [Sample() for i in range(8)] pool = workerpool.WorkerPool(size=8) for s in samples: pool.map(s.compute_fib, [20]) pool.wait() for s in samples: print s.fib  # ZeroDivisionError: integer division or modulo by zero # ^C^C^C^C^C^C^C^C^D^D # $ kill 1783 

Threadpool

Yet an other implementation here. This time when an exception occurs, it is printed to the stderr but the script is not interrupted and instead continues executing, which defies the purpose of the exception and can make things unsafe. Still not usable.

import threadpool  class Sample(object):     def compute_fib(self, n):         phi = (1 + 5**0.5) / 2         1/0         self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))  samples = [Sample() for i in range(8)] pool = threadpool.ThreadPool(8) requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples] requests = [y for x in requests for y in x] for r in requests: pool.putRequest(r) pool.wait() for s in samples: print s.fib  # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero # ZeroDivisionError: integer division or modulo by zero #---> 17 for s in samples: print s.fib # #AttributeError: 'Sample' object has no attribute 'fib' 

- Update -

It appears that concerning the futures library, the behavior of python 3 is not the same as python 2.

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed  def div_zero(x):     return x / 0  with ThreadPoolExecutor(max_workers=4) as executor:     futures = executor.map(div_zero, range(4))     for future in as_completed(futures): print(future) 

Python 2.7.6 output:

Traceback (most recent call last):   File "...futures_exceptions.py", line 12, in <module>     for future in as_completed(futures):   File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed     with _AcquireFutures(fs):   File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__     self.futures = sorted(futures, key=id)   File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map     yield future.result()   File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result     return self.__get_result()   File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result     raise self._exception ZeroDivisionError: integer division or modulo by zero 

Python 3.3.2 output:

Traceback (most recent call last):   File "...futures_exceptions.py", line 11, in <module>     for future in as_completed(futures):   File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed     with _AcquireFutures(fs):   File "...python3.3/concurrent/futures/_base.py", line 142, in __init__     self.futures = sorted(futures, key=id)   File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator     yield future.result()   File "...python3.3/concurrent/futures/_base.py", line 392, in result     return self.__get_result()   File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result     raise self._exception   File "...python3.3/concurrent/futures/thread.py", line 54, in run     result = self.fn(*self.args, **self.kwargs)   File "...futures_exceptions.py", line 7, in div_zero     return x / 0 ZeroDivisionError: division by zero 
like image 439
xApple Avatar asked Mar 12 '13 10:03

xApple


People also ask

How does Python handle exceptions in thread?

For catching and handling a thread's exception in the caller thread we use a variable that stores the raised exception (if any) in the called thread, and when the called thread is joined, the join function checks whether the value of exc is None, if it is then no exception is generated, otherwise, the generated ...

What if thread in a thread pool throws an exception?

As an example, if the thread throws an exception and pool class does not catch this exception, then the thread will simply exit, reducing the size of the thread pool by one. If this repeats many times, then the pool would eventually become empty and no threads would be available to execute other requests.

How does ThreadPoolExecutor work in Python?

ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

What happens if thread throws exception Python?

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread.


2 Answers

I personally use concurrent.futures as the interface is very simple. For the traceback issue I found a workaround to preserve it. Checkout my answer to this other question:

Getting original line number for exception in concurrent.futures

like image 198
se7entyse7en Avatar answered Oct 09 '22 03:10

se7entyse7en


If you want to get inforamtion about unhandled exception in threads and you use ThreadPoolExecutor, you can do like this:

import time import traceback  from concurrent.futures import ThreadPoolExecutor   def worker():     a = 2 / 0   def worker_callbacks(f):     e = f.exception()      if e is None:         return      trace = []     tb = e.__traceback__     while tb is not None:         trace.append({             "filename": tb.tb_frame.f_code.co_filename,             "name": tb.tb_frame.f_code.co_name,             "lineno": tb.tb_lineno         })         tb = tb.tb_next     print(str({         'type': type(e).__name__,         'message': str(e),         'trace': trace     }))   executor = ThreadPoolExecutor(max_workers=1) executor.submit(worker).add_done_callback(worker_callbacks) 
like image 24
Alexander Avatar answered Oct 09 '22 03:10

Alexander