Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing a loop of a function that writes to an array in python

I'm trying to implement multiprocessing for this loop. It fails to modify the array or and does not seem to order the jobs correctly (returns array before last function done).

import multiprocessing
import numpy


def func(i, array):
    array[i] = i**2
    print(i**2)

def main(n):
    array = numpy.zeros(n)

    if __name__ == '__main__':
        jobs = []
        for i in range(0, n):
            p = multiprocessing.Process(target=func, args=(i, array))
            jobs.append(p)
            p.start()

    return array

print(main(10))
like image 285
Tom Avatar asked Dec 18 '22 10:12

Tom


2 Answers

Processes do not share memory, your program initially will create an array full of zeroes, then start 10 processes, which will call the func function on a copy of the array when it was first created, but never the original array.

It seems like what you're really trying to accomplish is this:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Array


def modify_array(index, sharedarray):
    sharedarray[index] = index ** 2
    print([x for x in sharedarray])


def main(n):
    lock = Lock()
    array = Array('i', 10, lock=lock)
    if __name__ == '__main__':
        for i in range(0, n):
            p = Process(target=modify_array, args=(i, array))
            p.start()
            p.join()
    return list(array)

main(10)

Output:

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 1, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 1, 4, 0, 0, 0, 0, 0, 0, 0]
[0, 1, 4, 9, 0, 0, 0, 0, 0, 0]
[0, 1, 4, 9, 16, 0, 0, 0, 0, 0]
[0, 1, 4, 9, 16, 25, 0, 0, 0, 0]
[0, 1, 4, 9, 16, 25, 36, 0, 0, 0]
[0, 1, 4, 9, 16, 25, 36, 49, 0, 0]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 0]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

But the problem is, using multiprocessing is misguided. There's a lot of overhead in spawning an additional process, compared to a new thread, or even just staying single-threaded and utilizing an event loop to trigger actions.

An example of using concurrency, within a single-threaded, single process of Python may look like the following:

import numpy as np
from asyncio import get_event_loop, wait, ensure_future


def modify_array(index, array):
    array[index] = index ** 2
    print([x for x in array])


async def task(loop, function, index, array):
    await loop.run_in_executor(None, function, index, array)


def main(n):
    loop = get_event_loop()
    jobs = list()
    array = np.zeros(10)
    for i in range(0, n):
        jobs.append(
            ensure_future(
                task(loop, modify_array, i, array)
            )
        )
    loop.run_until_complete(wait(jobs))
    loop.close()

main(10)

This is a popular pattern these days, of using asyncio event loops to accomplish tasks in parallel. However, since you're using a library such as Numpy, I question how valuable this pattern may be to you.

like image 189
Vasili Syrakis Avatar answered May 06 '23 08:05

Vasili Syrakis


I've not used multiprocessing before so I'm new to this too, but after doing a little research (mainly from these two posts), I think I've partially managed to address your problem with this code:

import multiprocessing
import numpy


def func(i, array, connection):
    squared_value = i ** 2
    array[i] = squared_value
    print(squared_value)

    connection.send(array)


def main(n):
    array = numpy.zeros(n)

    for i in range(0, n):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=func, args=(i, array, send_end))
        p.start()
        p.join()
        array = recv_end.recv()

    return array


if __name__ == '__main__':
    print(main(10))

Output

0
1
4
9
16
25
36
49
64
81
[  0.   1.   4.   9.  16.  25.  36.  49.  64.  81.]

The reason why this approach modifies the array and yours doesn't is explained in this answer that I referenced in the comments:

The problem is that when the objects are passed to the worker processes, they are packed up with pickle, shipped to the other process, where they are unpacked and worked on. Your objects aren't so much passed to the other process, as cloned. You don't return the objects, so the cloned object are happily modified, and then thrown away.

There are a few things I should point out about my (partial) solution:

  • This implementation runs a lot slower than just generating this list the regular way (via a single thread). This is most likely due to the added overhead of creating new processes and marshalling data between them.

  • Due to the nature of your problem (having jobs that each modify an array), each job necessarily must take as input the output of the previous job. Because of this constraint, I don't believe it's possible to have the jobs run concurrently, which sort of defeats the point of multi-processing.

As for the latter of those two points, I did try a variation where func returns a function that accepts an array and returns a modified version of it. This would have allowed the jobs to run concurrently, but unfortunately it doesn't seem like functions can be pickled.

like image 28
Tagc Avatar answered May 06 '23 08:05

Tagc