As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.
Look at the code:
import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue
def worker(queue):
lock = Lock()
with lock:
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
Got the following output:
worker 100
main 0
What's happening and how to do what I want the right way? Thank you.
The Python priority queue is built on the heapq module, which is basically a binary heap. The get command dequeues the highest priority elements from the queue. Priority-object pairs can also be inserted into the queue. This way every task can be associated with a priority.
Python multiprocessing Queue class Queues are specially useful when passed as a parameter to a Process' target function to enable the Process to consume data. By using put() function we can insert data to then queue and using get() we can get items from queues. See the following code for a quick example.
While using multiprocessing in Python, Pipes acts as the communication channel. Pipes are helpful when you want to initiate communication between multiple processes. They return two connection objects, one for each end of the Pipe, and use the send() & recv() methods to communicate.
It is used to significantly speed up your program, especially if it has a lot of CPU extensive tasks. In this case, multiple functions can run together because each one will use a different CPU core which in turn will improve the CPU utilization.
The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue
objects that live inside the multiprocessing
module. Unfortunately, there is no PriorityQueue
implementation available. However, you can easily create one by registering a PriorityQueue
with a multiprocessing.Manager
class, like this:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue
class MyManager(SyncManager):
pass
MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue
def Manager():
m = MyManager()
m.start()
return m
def worker(queue):
print(queue)
for i in range(100):
queue.put(i)
print "worker", queue.qsize()
m = Manager()
pr_queue = m.PriorityQueue() # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()
time.sleep(5) # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()
Output:
worker 100
main 100
Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue
subclass. The Manager
-based PriorityQueue
is implemented by creating a Manager
server process which actually contains a regular PriorityQueue
, and then providing your main and worker processes with Proxy
objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queue
s just write/read data to/from a Pipe
. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue
by subclassing or delegating from multiprocessing.Queue
. It may not be worth the effort, though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With