Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Strange Queue.PriorityQueue behaviour with multiprocessing in Python 2.7.6

As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.

Look at the code:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

Got the following output:

worker 100
main 0

What's happening and how to do what I want the right way? Thank you.

like image 580
vortexxx192 Avatar asked Aug 15 '14 10:08

vortexxx192


People also ask

How does Priorityqueue () work Python?

The Python priority queue is built on the heapq module, which is basically a binary heap. The get command dequeues the highest priority elements from the queue. Priority-object pairs can also be inserted into the queue. This way every task can be associated with a priority.

What is queue in multiprocessing Python?

Python multiprocessing Queue class Queues are specially useful when passed as a parameter to a Process' target function to enable the Process to consume data. By using put() function we can insert data to then queue and using get() we can get items from queues. See the following code for a quick example.

How does Python handle multiprocessing?

While using multiprocessing in Python, Pipes acts as the communication channel. Pipes are helpful when you want to initiate communication between multiple processes. They return two connection objects, one for each end of the Pipe, and use the send() & recv() methods to communicate.

Does multiprocessing in Python speed up?

It is used to significantly speed up your program, especially if it has a lot of CPU extensive tasks. In this case, multiple functions can run together because each one will use a different CPU core which in turn will improve the CPU utilization.


1 Answers

The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation available. However, you can easily create one by registering a PriorityQueue with a multiprocessing.Manager class, like this:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

Output:

worker 100
main 100

Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue subclass. The Manager-based PriorityQueue is implemented by creating a Manager server process which actually contains a regular PriorityQueue, and then providing your main and worker processes with Proxy objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queues just write/read data to/from a Pipe. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue. It may not be worth the effort, though.

like image 149
dano Avatar answered Oct 08 '22 08:10

dano