Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array

I have a big python array that I would like to break into chunks and then perform a calculation on the chunks and then "reassembly" into one array. Below is what I have so far I'm just starting to learn threading in general and threading with Python.

def performCalc(binaryArray):

    # perform some operation
    rArray = blah * binaryArray
    return rArray

def main(argv):
    numberOfThreads = 5
    str(len(grey_arr) # 100,000 elements
    greyScaleChunks = np.array_split(grey_arr, numberOfThreads)
    for i in range(numberOfThreads):
        t = Thread(target=performCalc, args=(greyScaleChunks[i],))
        t.start()

    # take all the return values and later create one big array to be resized into matrix.

The ordering of the chunks is important and I have to maintain that.

like image 721
user2743 Avatar asked Oct 24 '25 12:10

user2743


1 Answers

If you want to solve it with explicit Thread objects, and you want to get the results of the thread functions, you need to hold onto those Thread objects so you can later join them and pull out their results. Like this:

ts = []
for i in range(numberOfThreads):
    t = Thread(target=performCalc, args=(greyScaleChunks[i],))
    ts.append(t)
    t.start()
for t in ts:
    t.join()
# When you get here, all threads have finished

Also, the default implementation of Thread.run just calls your target and throws away the result. So you need to store the return value somewhere the main thread can access. Many numpy programs do this by passing in a pre-allocated array to each thread, so they can fill them in, and that isn't too huge a change to your design, but it's not the way you're headed. You can of course pass in any other mutable object to mutate. Or set a global variable, etc. But you've designed this around returning a value, and that's a nice way to think about things, so let's stick with that. The easiest way to make that work is to subclass Thread:

class ReturningThread(threading.Thread):
    def run(self):
        try:
            if self._target:
                self._result = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs

    def join(self):
        super().join()
        return self._result

This is untested code, but it should work. (I've done similar things in real code, but more complicated, to allow join to handle timeouts properly; here I kept it dead simple, just adding a _result = in the run method and returning it in join.)

So:

ts = []
for i in range(numberOfThreads):
    t = ReturningThread(target=performCalc, args=(greyScaleChunks[i],))
    ts.append(t)
    t.start()
results = []
for t in ts:
    results.append(t.join())

And now you have a list of arrays that you can stack together.


However, what I did above is basically turn each thread into a half-assed future. It may be conceptually simpler to just use actual futures. This does mean that we're now using a thread pool that we don't really have any need for—there's exactly one task per thread. There's a probably-negligible performance cost (you're spending a lot more time on the actual work than the queueing, or you wouldn't want to thread this way in the first place), but, more importantly, we're adding significant extra complexity buried under the hood (in a well-tested stdlib module) for a bit less complexity in our code; whether or not that's worth it is up to you. Anyway:

with concurrent.futures.ThreadPoolExecutor(max_workers=numberOfThreads) as x:
    results = x.map(performCalc, greyScaleChunks)

This handles creating 5 threads, creating a job for each performCalc(chunk), partitioning the 5 jobs out to the 5 threads, joining the threads, and gathering the 5 jobs' results in order, so all you have to do is stack up the results.


Another advantage of using an executor is that if it turns out your code isn't benefiting from thread-parallelism because of the GIL (unlikely to be a problem in your case—you should be spending most of your time in a numpy operation over 20000 rows, which will run with the GIL released—but obviously you have to test to verify that's true), you can switch to processes very easily: just change that ThreadPoolExecutor to a ProcessPoolExecutor and you're done.

It's possible that your args and returns can't be either copied or shared between processes the default way, or that doing so is so expensive that it kills all the benefits of parallelism—but the fact that you can test that with a one-word change, and then only deal with it if it's a problem, is still a win.

like image 138
abarnert Avatar answered Oct 27 '25 00:10

abarnert



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!