So I have a tool written that takes list of items, splits it up into a given number of lists (let's say 10) and then takes those 10 lists and spawns off 10 threads, "EvaluationThreads" (extending threading.thread), and each of those threads evaluates whatever they were supplied to evaluate. When I start each thread I put them all into a list and after spawning them off I have the following code:
for th in threadList:
th.join()
someTotal = th.resultsAttribute
And that's how I handle waiting for all the threads to finish and gathering their information. While this is a working way of waiting for everything to finish and then gathering results I feel like there must be a more elegant way of doing it because these threads could very well finish at different times and if the first one to start finishes last all the ones that finished earlier must wait for that thread to finish before they can be joined. Is there a way to get these threads' information and join them as they finish rather than in the order they were started? I originally figured I would use some sort of callbacks in the threads or something but I'm not sure if there is a more acceptable solution.
Thanks for your help.
EDIT: To clarify, my evaluation function is not CPU bound and I'm not trying to distribute the documents among the threads to get it done as quickly as possible, each thread has a fixed about even number of jobs.
For your main question:
If you're doing something more complex than this—or, in particular, if you're doing this repeatedly—you probably want a "thread group" class. There are dozens of them pre-made, but it's pretty trivial to write one yourself if you don't like any of them.
Then, instead of this:
threadList = []
for argchunk in splitIntoChunks(values, 10):
threadList.append(threading.Thread(target=myThreadFunc, args=argchunk))
...
someTotal = 0
for th in threadList:
th.join()
someTotal += th.resultsAttribute
You can do this:
threadGroup = ThreadGroup.ThreadGroup()
for argchunk in splitIntoChunks(values, 10):
threadGroup.newThread(myThreadFunc, argchunk)
threadGroup.join()
someTotal = sum(th.resultsAttribute for th in threadGroup)
Or, maybe even better, a full thread pool library, so you can do this:
pool = ThreadPool(10)
for argchunk in splitIntoChunks(values, 100):
pool.putRequest(myThreadFunc, argchunk)
pool.wait()
The advantage here is that you can just as easily have 100 jobs scheduled as appropriate on 10 threads, instead of 10 jobs one per thread, without all the work of maintaining a queue, etc. The disadvantage is that you can't just iterate threads to get the return values, you have to iterate jobs—and ideally, you don't want to keep the jobs alive until the end just so you can iterate them.
Which brings us to your second question, how to get values out of the threads (or jobs). There are many, many ways to do this.
What you've done works. You don't even need any locking.
Using callbacks, as you suggested, also works. But keep in mind that the callback will run on the worker thread, not the main thread, so if it's accessing some global object, you will need some kind of synchronization.
If you're going to synchronize anyway, there may not be any benefit to the callbacks. For example, if all you're trying to do is sum a bunch of values, you can just set total=[0], and have each thread just do total[0] += myValue inside a lock. (Of course in this case, it probably makes more sense to just do the summing in the main thread and avoid the lock, but if the work of amalgamating the results is much heftier, that choice may not be as simple.)
You can also use an atomic object of some kind, instead of locking explicitly. For example, the standard Queue.Queue and collections.deque are both atomic, so each thread can just set q = Queue.Queue(), then each thread pushes its result by doing q.push(myValue), then after joining you just iterate and sum up the queue's values.
In fact, if each thread is pushing to the queue exactly once, you can just do 10 blocking gets on the queue itself, after which you know that the group.join() or pool.wait() or whatever will return quickly.
Or you can even push callbacks as jobs onto a queue. Again, you can do 10 blocking gets on the queue, executing the result each time.
If each thread can return multiple objects, they can push a sentinel value or callback onto the queue when they're done, and your main thread keeps popping until it reads 10 sentinels.
Use a Queue to push the information from your threads out as soon as it is available:
Let's say this is your thread:
class myThread(threading.Thread):
def __init__(self, results_queue):
self.results_queue = results_queue
#other init code here
def run(self):
#thread code here
self.results_queue.put(result) #result is the information you want from the thread
And this is your main code:
import Queue #or "import queue" in Python 3.x
results_queue = Queue()
#thread init code here
for i in xrange(num_threads_running):
data = results_queue.get() # queue.get() blocks until some item is available
#process data as it is made available
#at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With