Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronously wait for multiprocessing Queue in main process

I have the following scenario: multiple worker processes send events about their current status to an event dispatcher. This event dispatcher then needs to process all the events if we are in the main process or signal the event dispatcher of the main process to handle these events if we are in a worker process.

The main crux here is that event handling must also be in the main thread of the main process, so I can't just run a while True loop inside a thread and wait for messages from worker processes there.

So what I have is this:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import current_process, Process, Queue
from threading import current_thread
from time import sleep

def get_q(q):
    print("Waiting for the queue ({} / {})\n".format(current_thread().name, current_process().name))
    return q.get()

async def message_q(q):
    while True:
        f = loop.run_in_executor(None, get_q, q)

        await f

        if f.result() is None:
            print("Done")
            return;

        print("Got the result ({} / {})".format(current_thread().name, current_process().name))
        print("Result is: {}\n".format(f.result()))

async def something_else():
    while True:
        print("Something else\n")
        await asyncio.sleep(2)

def other_process(q):
    for i in range(5):
        print("Putting something in the queue ({})".format(current_process().name))
        q.put(i)
        sleep(1)

    q.put(None)

q = Queue()

Process(target=other_process, args=(q,), daemon=True).start()

loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=1))
asyncio.ensure_future(message_q(q))
asyncio.ensure_future(something_else())
loop.run_until_complete(asyncio.sleep(6))

other_process() is an exemplary worker process which uses a Queue to signal the main process which runs an event loop to process stuff and also wait for any data on the Queue. In the real case, this process would signal the event dispatcher which would then handle the queue messaging, to pass the message on to the main process event dispatcher, but here I simplified it a bit.

However, I'm not quite satisfied with this. Submitting get_q() again and again to a ThreadPoolExecutor produces more overhead and isn't as clean as just one long-running thread. Also the await f there isn't optimal and blocks as soon as no further data is in the queue, which prevents the event loop from exiting. My workaround is to send None after the workers have finished and exit message_q() if None is in the queue.

Is there any better way to implement this? Performance is quite crucial and I would like to keep the Queue object local to the event dispatcher and not pass it to the code that manages the worker processes (or require calling some sort of finalize() method thereof).

like image 263
Janek Bevendorff Avatar asked Oct 18 '22 10:10

Janek Bevendorff


1 Answers

I implemented this now as an async context manager. The context manager calls

asyncio.ensure_future(message_q())

in its __aenter__() method and adds None to the queue in its __aexit__() method to shut down the endless loop in message_q().

The context manager can then be used in an async with statement around the process-spawning code section, eliminating the need to call a shutdown method manually. It is, however, advisable to call await asyncio.sleep(0) inside the __aenter__() method after ensuring the message_q() coroutine to allow the context manager to initialize the queue listener. Otherwise, message_q() will not be called immediately. That is not a problem per se (because the queue is filled anyway), but it delays event forwarding until the next await occurs in the code.

The processes should be spawned using a ProcessPoolExecutor together with loop.run_in_executor(), so waiting for the processes doesn't block the event loop.

Instead of using a Queue, you may also want to use a JoinableQueue to make sure all events haven been processed before exiting the context manager.

like image 102
Janek Bevendorff Avatar answered Oct 20 '22 22:10

Janek Bevendorff