Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue vs JoinableQueue in Python

Tags:

In Python while using multiprocessing module there are 2 kinds of queues:

  • Queue
  • JoinableQueue.

What is the difference between them?

Queue

from multiprocessing import Queue q = Queue() q.put(item) # Put an item on the queue item = q.get() # Get an item from the queue 

JoinableQueue

from multiprocessing import JoinableQueue q = JoinableQueue() q.task_done() # Signal task completion q.join() # Wait for completion 
like image 252
axcelenator Avatar asked Jul 05 '15 12:07

axcelenator


People also ask

What is queue queue () in Python?

Queue is built-in module of Python which is used to implement a queue. queue. Queue(maxsize) initializes a variable to a maximum size of maxsize. A maxsize of zero '0' means a infinite queue.

Is queue process safe Python?

Queues are thread and process safe.

What is queue in multiprocessing Python?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.


2 Answers

JoinableQueue has methods join() and task_done(), which Queue hasn't.


class multiprocessing.Queue( [maxsize] )

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.

Queue implements all the methods of Queue.Queue except for task_done() and join().


class multiprocessing.JoinableQueue( [maxsize] )

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.


If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

like image 62
fferri Avatar answered Sep 26 '22 02:09

fferri


Based on the documentation, it's hard to be sure that Queue is actually empty. With JoinableQueue you can wait for the queue to empty by calling q.join(). In cases where you want to complete work in distinct batches where you do something discrete at the end of each batch, this could be helpful.

For example, perhaps you process 1000 items at a time through the queue, then send a push notification to a user that you've completed another batch. This would be challenging to implement with a normal Queue.

It might look something like:

import multiprocessing as mp  BATCH_SIZE = 1000 STOP_VALUE = 'STOP'  def consume(q):   for item in iter(q.get, STOP_VALUE):     try:       process(item)     # Be very defensive about errors since they can corrupt pipes.     except Exception as e:       logger.error(e)     finally:       q.task_done()  q = mp.JoinableQueue() with mp.Pool() as pool:   # Pull items off queue as fast as we can whenever they're ready.   for _ in range(mp.cpu_count()):     pool.apply_async(consume, q)   for i in range(0, len(URLS), BATCH_SIZE):     # Put `BATCH_SIZE` items in queue asynchronously.     pool.map_async(expensive_func, URLS[i:i+BATCH_SIZE], callback=q.put)     # Wait for the queue to empty.     q.join()     notify_users()   # Stop the consumers so we can exit cleanly.   for _ in range(mp.cpu_count()):     q.put(STOP_VALUE) 

NB: I haven't actually run this code. If you pull items off the queue faster than you put them on, you might finish early. In that case this code sends an update AT LEAST every 1000 items, and maybe more often. For progress updates, that's probably ok. If it's important to be exactly 1000, you could use an mp.Value('i', 0) and check that it's 1000 whenever your join releases.

like image 23
Ben Avatar answered Sep 22 '22 02:09

Ben