In Python while using multiprocessing module there are 2 kinds of queues:
What is the difference between them?
from multiprocessing import Queue q = Queue() q.put(item) # Put an item on the queue item = q.get() # Get an item from the queue
from multiprocessing import JoinableQueue q = JoinableQueue() q.task_done() # Signal task completion q.join() # Wait for completion
Queue is built-in module of Python which is used to implement a queue. queue. Queue(maxsize) initializes a variable to a maximum size of maxsize. A maxsize of zero '0' means a infinite queue.
Queues are thread and process safe.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.
JoinableQueue
has methods join()
and task_done()
, which Queue
hasn't.
class multiprocessing.Queue( [maxsize] )
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.
Queue implements all the methods of Queue.Queue except for task_done() and join().
class multiprocessing.JoinableQueue( [maxsize] )
JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
If you use JoinableQueue
then you must call JoinableQueue.task_done()
for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.
Based on the documentation, it's hard to be sure that Queue
is actually empty. With JoinableQueue
you can wait for the queue to empty by calling q.join()
. In cases where you want to complete work in distinct batches where you do something discrete at the end of each batch, this could be helpful.
For example, perhaps you process 1000 items at a time through the queue, then send a push notification to a user that you've completed another batch. This would be challenging to implement with a normal Queue
.
It might look something like:
import multiprocessing as mp BATCH_SIZE = 1000 STOP_VALUE = 'STOP' def consume(q): for item in iter(q.get, STOP_VALUE): try: process(item) # Be very defensive about errors since they can corrupt pipes. except Exception as e: logger.error(e) finally: q.task_done() q = mp.JoinableQueue() with mp.Pool() as pool: # Pull items off queue as fast as we can whenever they're ready. for _ in range(mp.cpu_count()): pool.apply_async(consume, q) for i in range(0, len(URLS), BATCH_SIZE): # Put `BATCH_SIZE` items in queue asynchronously. pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put) # Wait for the queue to empty. q.join() notify_users() # Stop the consumers so we can exit cleanly. for _ in range(mp.cpu_count()): q.put(STOP_VALUE)
NB: I haven't actually run this code. If you pull items off the queue faster than you put them on, you might finish early. In that case this code sends an update AT LEAST every 1000 items, and maybe more often. For progress updates, that's probably ok. If it's important to be exactly 1000, you could use an mp.Value('i', 0)
and check that it's 1000 whenever your join
releases.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With