Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python queues - have at most n threads running

The scenario:

I have a really large DB model migration going on for a new build, and I'm working on boilerplating how we will go about migration current live data from a webapp into the local test databases.

I'd like to setup in python a script that will concurrently process the migration of my models. I have from_legacy and to_legacy methods for my model instances. What I have so far loads all my instances and creates threads for each, with each thread subclassed from the core threading modules with a run method that just does the conversion and saves the result.

I'd like to make the main loop in the program build a big stack of instances of these threads, and start to process them one by one, running only at most 10 concurrently as it does its work, and feeding the next in to be processed as others finish migrating.

What I can't figure out is how to utilize the queue correctly to do this? If each thread represents the full task of migration, should I load all the instances first and then create a Queue with maxsize set to 10, and have that only track currently running queues? Something like this perhaps?

currently_running = Queue()
for model in models:
  task = Migrate(models) #this is subclassed thread
  currently_running.put(task)
  task.start()

In this case relying on the put call to block while it is at capacity? If I were to go this route, how would I call task_done?

Or rather, should the Queue include all the tasks (not just the started ones) and use join to block to completion? Does calling join on a queue of threads start the included threads?

What is the best methodology to approach the "at most have N running threads" problem and what role should the Queue play?

like image 218
DeaconDesperado Avatar asked Apr 13 '26 18:04

DeaconDesperado


1 Answers

Although not documented, the multiprocessing module has a ThreadPool class which, as its name implies, creates a pool of threads. It shares the same API as the multiprocessing.Pool class.

You can then send tasks to the thread pool using pool.apply_async:

import multiprocessing.pool as mpool

def worker(task):
    # work on task
    print(task)     # substitute your migration code here.

# create a pool of 10 threads
pool = mpool.ThreadPool(10)
N = 100

for task in range(N):
    pool.apply_async(worker, args = (task, ))

pool.close()
pool.join()
like image 188
unutbu Avatar answered Apr 16 '26 09:04

unutbu