Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing Queue failure

I create 100 child processes

proc_list = [
    Process(target = simulator, args=(result_queue,))
    for i in xrange(100)]

and start them

for proc in proc_list: proc.start()

Each process puts into the result_queue (instance of multiprocessing.Queue) 10000 tuples after doing some processing.

def simulate(alg_instance, image_ids, gamma, results,
                     simulations, sim_semaphore):
  (rs, qs, t_us) =  alg_instance.simulate_multiple(image_ids, gamma,
                                             simulations)
  all_tuples = zip(rs, qs, t_us)
  for result in all_tuples:
    results.put(result)
  sim_semaphore.release()

I should be (?) getting 1000000 tuples at the queue, but after various runs I get these (sample) sizes: 14912 19563 12952 13524 7487 18350 15986 11928 14281 14282 7317

Any suggestions?

like image 944
user1451817 Avatar asked Jul 11 '12 23:07

user1451817


People also ask

Is multiprocessing queue thread safe?

This includes queues in the multiprocessing.Queues are thread and process safe. This means that processes may get() and put() items from and to the queue concurrently without fear of a race condition. You can learn more about to how to use queues with multiple processes in the tutorial: Multiprocessing Queue in Python.

How does Python multiprocessing queue work?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

How do I stop a Python multiprocess?

A process can be killed by calling the Process. terminate() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.


1 Answers

My solution to multiprocessing issues is almost always to use the Manager objects. While the exposed interface is the same, the underlying implementation is much simpler and has less bugs.

from multiprocessing import Manager
manager = Manager()
result_queue = manager.Queue()

Try it out and see if it doesn't fix your issues.

like image 165
bukzor Avatar answered Sep 23 '22 03:09

bukzor