How can I limit the number of concurrent threads in Python?
For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.
Here is what I have so far:
def process_file(fname):
# open file and do something
def process_file_thread(queue, fname):
queue.put(process_file(fname))
def process_all_files(d):
files=glob.glob(d + '/*')
q=Queue.Queue()
for fname in files:
t=threading.Thread(target=process_file_thread, args=(q, fname))
t.start()
q.join()
def main():
process_all_files('.')
# Do something after all files have been processed
How can I modify the code so that only 4 threads are run at a time?
Note that I want to wait for all files to be processed and then continue and work on the processed files.
Generally, Python only uses one thread to execute the set of written statements. This means that in python only one thread will be executed at a time.
In fact, a Python process cannot run threads in parallel but it can run them concurrently through context switching during I/O bound operations. This limitation is actually enforced by GIL. The Python Global Interpreter Lock (GIL) prevents threads within the same process to be executed at the same time.
For example, I have a directory with many files, and I want to process all of them, but only 4 at a time in parallel.
That's exactly what a thread pool does: You create jobs, and the pool runs 4 at a time in parallel. You can make things even simpler by using an executor, where you just hand it functions (or other callables) and it hands you back futures for the results. You can build all of this yourself, but you don't have to.*
The stdlib's concurrent.futures
module is the easiest way to do this. (For Python 3.1 and earlier, see the backport.) In fact, one of the main examples is very close to what you want to do. But let's adapt it to your exact use case:
def process_all_files(d):
files = glob.glob(d + '/*')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
fs = [executor.submit(process_file, file) for file in files]
concurrent.futures.wait(fs)
If you wanted process_file
to return something, that's almost as easy:
def process_all_files(d):
files = glob.glob(d + '/*')
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
fs = [executor.submit(process_file, file) for file in files]
for f in concurrent.futures.as_completed(fs):
do_something(f.result())
And if you want to handle exceptions too… well, just look at the example; it's just a try
/except
around the call to result()
.
* If you want to build them yourself, it's not that hard. The source to multiprocessing.pool
is well written and commented, and not that complicated, and most of the hard stuff isn't relevant to threading; the source to concurrent.futures
is even simpler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With