I have a big python array that I would like to break into chunks and then perform a calculation on the chunks and then "reassembly" into one array. Below is what I have so far I'm just starting to learn threading in general and threading with Python.
def performCalc(binaryArray):
# perform some operation
rArray = blah * binaryArray
return rArray
def main(argv):
numberOfThreads = 5
str(len(grey_arr) # 100,000 elements
greyScaleChunks = np.array_split(grey_arr, numberOfThreads)
for i in range(numberOfThreads):
t = Thread(target=performCalc, args=(greyScaleChunks[i],))
t.start()
# take all the return values and later create one big array to be resized into matrix.
The ordering of the chunks is important and I have to maintain that.
If you want to solve it with explicit Thread objects, and you want to get the results of the thread functions, you need to hold onto those Thread objects so you can later join them and pull out their results. Like this:
ts = []
for i in range(numberOfThreads):
t = Thread(target=performCalc, args=(greyScaleChunks[i],))
ts.append(t)
t.start()
for t in ts:
t.join()
# When you get here, all threads have finished
Also, the default implementation of Thread.run just calls your target and throws away the result. So you need to store the return value somewhere the main thread can access. Many numpy programs do this by passing in a pre-allocated array to each thread, so they can fill them in, and that isn't too huge a change to your design, but it's not the way you're headed. You can of course pass in any other mutable object to mutate. Or set a global variable, etc. But you've designed this around returning a value, and that's a nice way to think about things, so let's stick with that. The easiest way to make that work is to subclass Thread:
class ReturningThread(threading.Thread):
def run(self):
try:
if self._target:
self._result = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self):
super().join()
return self._result
This is untested code, but it should work. (I've done similar things in real code, but more complicated, to allow join to handle timeouts properly; here I kept it dead simple, just adding a _result = in the run method and returning it in join.)
So:
ts = []
for i in range(numberOfThreads):
t = ReturningThread(target=performCalc, args=(greyScaleChunks[i],))
ts.append(t)
t.start()
results = []
for t in ts:
results.append(t.join())
And now you have a list of arrays that you can stack together.
However, what I did above is basically turn each thread into a half-assed future. It may be conceptually simpler to just use actual futures. This does mean that we're now using a thread pool that we don't really have any need for—there's exactly one task per thread. There's a probably-negligible performance cost (you're spending a lot more time on the actual work than the queueing, or you wouldn't want to thread this way in the first place), but, more importantly, we're adding significant extra complexity buried under the hood (in a well-tested stdlib module) for a bit less complexity in our code; whether or not that's worth it is up to you. Anyway:
with concurrent.futures.ThreadPoolExecutor(max_workers=numberOfThreads) as x:
results = x.map(performCalc, greyScaleChunks)
This handles creating 5 threads, creating a job for each performCalc(chunk), partitioning the 5 jobs out to the 5 threads, joining the threads, and gathering the 5 jobs' results in order, so all you have to do is stack up the results.
Another advantage of using an executor is that if it turns out your code isn't benefiting from thread-parallelism because of the GIL (unlikely to be a problem in your case—you should be spending most of your time in a numpy operation over 20000 rows, which will run with the GIL released—but obviously you have to test to verify that's true), you can switch to processes very easily: just change that ThreadPoolExecutor to a ProcessPoolExecutor and you're done.
It's possible that your args and returns can't be either copied or shared between processes the default way, or that doing so is so expensive that it kills all the benefits of parallelism—but the fact that you can test that with a one-word change, and then only deal with it if it's a problem, is still a win.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With