The code below first starts multiple processes. Then it runs a while True
loop checking the queue
objects. Lastly, it iterates the processes to check if any alive. After all the processes are completed it breaks
the while
loop.
Unfortunately, it happens while the queue
object is not empty. Breaking the loop without getting a data stored in queue
could be an easy to oversee data loss. How to modify the code logic so it assures the queue
object is empty before breaking the loop?
import time, multiprocessing, os
logger = multiprocessing.log_to_stderr()
def foo(*args):
for i in range(3):
queue = args[0]
queue.put(os.getpid())
items = dict()
for i in range(5):
queue = multiprocessing.Queue()
proc = multiprocessing.Process(target=foo, args=(queue,))
items[proc] = queue
proc.start()
time.sleep(0.1)
while True:
time.sleep(1)
for proc, queue in items.items():
if not queue.empty():
print(queue.get())
if not True in [proc.is_alive() for proc in items]:
if not queue.empty():
logger.warning('...not empty: %s' % queue.get())
break
synchronization issue, again. when you check a queue find it is empty, there is no guarantee that no new item would come in the future.
you could put a sentinel to the queue when a subprocess finishes its job, to notify there will be no more items in the queue. parent process could drain the queue until got the sentinel. this is also the method used by multiprocessing.Pool
. you could use None
as sentinel here:
def foo(*args):
for i in range(3):
queue = args[0]
queue.put(os.getpid())
queue.put(None)
...
while items:
for proc in tuple(items.keys()):
queue = items[proc]
if not queue.empty():
r = queue.get()
print(r)
if r is None:
proc.join()
del items[proc]
time.sleep(0.1)
A working solution is posted below. Instead of running the procs with the Process.run
this approach uses multiprocessing.pool.ThreadPool.map_async
method that starts the processes without blocking. multiprocessing.Queue
object is then used to store data which is accessible by foo
function running by the MainProcess.
import time, multiprocessing, Queue
from multiprocessing.pool import ThreadPool
logger = multiprocessing.log_to_stderr()
def foo(args):
queue = args[0]
arg = args[1]
for i in range(3):
time.sleep(2)
queue.put([arg, time.time()])
pool = ThreadPool(processes=4)
queue = multiprocessing.Queue()
map_result = pool.map_async(foo, [(queue, arg) for arg in range(3)])
logger.warning("map_result: %s" % map_result)
map_result.wait(timeout = 10)
if not map_result.ready():
message = '%s is timed out and terminated.' % pool
log.error(message)
pool.terminate()
raise Exception(message)
while not queue.empty():
if queue.empty():
break
logger.warning("queue_data: %r" % queue.get(True, 0.1))
pool.close()
pool.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With