Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing queue get() timeout despite full queue

I am using Python's multiprocessing module to do scientific parallel processing. In my code I use several working processes which does the heavy lifting and a writer process which persists the results to disk. The data to be written is send from the worker processes to the writer process via a Queue. The data itself is rather simple and solely consists of a tuple holding a filename and a list with two floats. After several hours of processing the writer process often would get stuck. More precisely the following block of code

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))

will never exit the loop and I get continuous 'Timeout' messages.

I also implemented a logging process which outputs, among others, the status of the queue and, even though I get the timeout error message above, a call to qsize() constantly returns a full queue (size=48 in my case).

I have thoroughly checked the documentation on the queue object and can find no possible explanation for why the get() returns timeouts while the queue is full at the same time.

Any ideas?

Edit:

I modified the code to make sure I catch an empty queue exception:

while (True):
    try:
        item = queue.get(timeout=60)
        break
    except Empty as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
like image 315
AndreJohannes Avatar asked May 22 '17 06:05

AndreJohannes


People also ask

Is Python multiprocessing queue thread safe?

Queues are thread and process safe.

Is multiprocessing queue process safe?

Using a queue in multiprocessingOperations with a queue are process-safe. The multiprocessing Queue implements all the methods of queue. Queue except for task_done() and join() .

Is multiprocessing faster in Python?

Using multiprocessing won't make the program any faster. Another use case for threading is programs that are IO bound or network bound, such as web-scrapers. In this case, multiple threads can take care of scraping multiple webpages in parallel.


2 Answers

In multiprocessing queue is used as synchronized message queue. This also seems to be the case in your problem. This however requires more than just call to get() method. After every task is processed you need to call task_done() so that element get removed from queue.

From documentation:

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

In documentation you will also find code example of proper threading queue usage.

In case of your code it should be like this

while (True):
    try:
        item = queue.get(timeout=60)
        if item is None:
            break
        # call working fuction here
        queue.task_done()
    except Exception as error:
        logging.info("Writer: Timeout occurred {}".format(str(error)))
like image 199
Tomasz Plaskota Avatar answered Sep 28 '22 08:09

Tomasz Plaskota


Switching to manager based queue should help solve this issue.

manager = Manager()
queue   = manager.Queue()

For more details you can check multiprocessing documentation here: https://docs.python.org/2/library/multiprocessing.html

like image 22
m3th0d Avatar answered Sep 28 '22 08:09

m3th0d