Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor: how to limit the queue maxsize?

Tags:

I am using ThreadPoolExecutor class from the concurrent.futures package

def some_func(arg):     # does some heavy lifting     # outputs some results  from concurrent.futures import ThreadPoolExecutor  with ThreadPoolExecutor(max_workers=1) as executor:     for arg in range(10000000):         future = executor.submit(some_func, arg) 

but I need to limit the queue size somehow, as I don't want millions of futures to be created at once, is there a simple way to do it or should I stick to queue.Queue and threading package to accomplish this?

like image 970
Bob Avatar asked Jan 15 '18 13:01

Bob


People also ask

How do you stop a ProcessPoolExecutor?

The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.

Is ThreadPoolExecutor faster?

The ThreadPoolExecutor is designed to speed-up your program by executing tasks concurrently. Nevertheless, in some use cases, using the ThreadPoolExecutor can make your program slower. Sometimes dramatically slower than performing the same task in a for loop.

Is ThreadPoolExecutor concurrent?

The ThreadPoolExecutor Python class is used to create and manage thread pools and is provided in the concurrent. futures module.

Is ThreadPoolExecutor thread safe Python?

ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.


2 Answers

Python's ThreadPoolExecutor doesn't have the feature you're looking for, but the provided class can be easily sub-classed as follows to provide it:

from concurrent import futures import queue  class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):     def __init__(self, maxsize=50, *args, **kwargs):         super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)         self._work_queue = queue.Queue(maxsize=maxsize) 
like image 123
andres.riancho Avatar answered Oct 17 '22 21:10

andres.riancho


from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED  limit = 10  futures = set()  with ThreadPoolExecutor(max_workers=1) as executor:     for arg in range(10000000):         if len(futures) >= limit:             completed, futures = wait(futures, return_when=FIRST_COMPLETED)         futures.add(executor.submit(some_func, arg)) 
like image 41
Bob Avatar answered Oct 17 '22 20:10

Bob