Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing pool and queues

I am using multiprocessing with pools. I need to pass a structure as argument to a function that has to be used in separate processes. I am facing an issue with the mapping functions of the multiprocessing.Pool, since I cannot duplicate neither Pool.Queue, nor Pool.Array. This structure is to be used on the fly to log the result of each terminated process. Here is my code:

import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time

def do_work(number, out_queue=None):
    if out_queue is not None:
        print "Treated nb ", number
        out_queue.append("Treated nb " + str(number))
    return 0


def multi_run_wrapper(iter_values):
    return do_work(*iter_values)

def test_pool():
    # Get the max cpu
    nb_proc = multiprocessing.cpu_count()

    pool = multiprocessing.Pool(processes=nb_proc)
    total_tasks = 16
    tasks = range(total_tasks)

    out_queue= Queue()  # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
    out_array = Array('i', total_tasks)
    iter_values = itertools.izip(tasks, itertools.repeat(out_array))
    results = pool.map_async(multi_run_wrapper, iter_values)

    pool.close()
    pool.join()
    print results._value
    while not out_queue.empty():
        print "queue: ", out_queue.get()
    print "out array: \n", out_array

if __name__ == "__main__":
    test_pool()

I need to launch a worker in a detached process and to pass my output queue as argument. I also want to specify the pool containing a limited number of running processes. For that I am using the pool.map_async() function. Unfortunately the piece of code above gives me an error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

I believe it is because a Queue cannot be copied, ever, as I read in the doc. Then I thought of making the queue a global variable so that I would not need to pass it anynmore, but that would be so messy in my opinion. I also thought of using a multiprocessing.Array instead

out_array = Array('i', total_tasks)

but the same error would be risen as with queues:

# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance

I need to use this feature - use of multiprocessing and exchanging informations from subprocesses - in a relatively big software so I want my code to remain clean and tidy.

How can I pass the queue to my worker in an elegant way?

Of course, any other way of dealing with the main specification is welcome.

like image 763
kaligne Avatar asked Jun 12 '26 04:06

kaligne


1 Answers

multiprocessing.Pool will not accept a multiprocessing.Queue as an argument in its work queue. I believe this is because it internally uses queues to send data back and forth to the worker processes. There are a couple workarounds:

1) Do you really need to use a queue? One advantage of the Pool function is that their return values are sent back to the main processes. It is generally better to iterate over the return values from a pool than to use a separate queue. This also avoids the race condition introduce by checking queue.empty()

2) If you must use a Queue, you can use one from multiprocessing.Manager. This is a proxy to a shared queue which can be passed as an argument to the Pool functions.

3) You can pass a normal Queue to worker processes by using an initializer when creating the Pool(like https://stackoverflow.com/a/3843313). This is kinda hacky.

The race condition I mentioned above comes from:

while not out_queue.empty():
    print "queue: ", out_queue.get()

When you have worker processes filling your queue, you can have the condition where your queue is currently empty because a worker is about to put something into it. If you check .empty() at this time you will end early. A better method is to put sentinal values in your queue to signal when you are finished putting data into it.

like image 133
bj0 Avatar answered Jun 13 '26 18:06

bj0



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!