Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to assure the multiprocessing queue is empty

The code below first starts multiple processes. Then it runs a while True loop checking the queue objects. Lastly, it iterates the processes to check if any alive. After all the processes are completed it breaks the while loop. Unfortunately, it happens while the queue object is not empty. Breaking the loop without getting a data stored in queue could be an easy to oversee data loss. How to modify the code logic so it assures the queue object is empty before breaking the loop?

import time, multiprocessing, os
logger = multiprocessing.log_to_stderr()

def foo(*args):
    for i in range(3):
        queue = args[0]
        queue.put(os.getpid())

items = dict()
for i in range(5):
    queue = multiprocessing.Queue()
    proc = multiprocessing.Process(target=foo, args=(queue,))
    items[proc] = queue
    proc.start()
    time.sleep(0.1)

while True:
    time.sleep(1)

    for proc, queue in items.items():
        if not queue.empty():
            print(queue.get()) 

    if not True in [proc.is_alive() for proc in items]:
        if not queue.empty():
            logger.warning('...not empty: %s' % queue.get()) 
        break 
like image 612
alphanumeric Avatar asked Oct 16 '25 20:10

alphanumeric


2 Answers

synchronization issue, again. when you check a queue find it is empty, there is no guarantee that no new item would come in the future.

you could put a sentinel to the queue when a subprocess finishes its job, to notify there will be no more items in the queue. parent process could drain the queue until got the sentinel. this is also the method used by multiprocessing.Pool. you could use None as sentinel here:

def foo(*args):
    for i in range(3):
        queue = args[0]
        queue.put(os.getpid())
    queue.put(None)

...

while items:
    for proc in tuple(items.keys()):
        queue = items[proc]
        if not queue.empty():
            r = queue.get()
            print(r)
            if r is None:
                proc.join()
                del items[proc]
    time.sleep(0.1)
like image 187
georgexsh Avatar answered Oct 18 '25 16:10

georgexsh


A working solution is posted below. Instead of running the procs with the Process.run this approach uses multiprocessing.pool.ThreadPool.map_async method that starts the processes without blocking. multiprocessing.Queue object is then used to store data which is accessible by foo function running by the MainProcess.

import time, multiprocessing, Queue
from multiprocessing.pool import ThreadPool
logger = multiprocessing.log_to_stderr()

def foo(args):
    queue = args[0]
    arg = args[1]
    for i in range(3):
        time.sleep(2)
        queue.put([arg, time.time()])

pool = ThreadPool(processes=4)
queue = multiprocessing.Queue()
map_result = pool.map_async(foo, [(queue, arg) for arg in range(3)])

logger.warning("map_result: %s" % map_result) 

map_result.wait(timeout = 10) 
if not map_result.ready():
    message = '%s is timed out and terminated.' % pool 
    log.error(message)
    pool.terminate()
    raise Exception(message)

while not queue.empty():
    if queue.empty():
        break
    logger.warning("queue_data: %r" % queue.get(True, 0.1))  

pool.close()
pool.join()
like image 29
alphanumeric Avatar answered Oct 18 '25 15:10

alphanumeric