Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python asyncio: Running subprocess_exec on a worker thread

So I'm using the Python asyncio module (on Linux) to launch a child process and then asynchronously monitor it. My code works fine... when run on the main thread. But when I run it on a worker thread, it hangs, and the process_exited callback is never invoked.

I suspect this may actually be some kind of undocumented defect or issue with running subprocess_exec on a worker thread, likely having to do with how the implementation handles signals in a background thread. But it could also just be me screwing things up.

A simple, reproducible example is as follows:

class MyProtocol(asyncio.SubprocessProtocol):
    def __init__(self, done_future):
        super().__init__()
        self._done_future = done_future

    def pipe_data_received(self, fd, data):
        print("Received:", len(data))

    def process_exited(self):
        print("PROCESS EXITED!")
        self._done_future.set_result(None)

def run(loop):
    done_future = asyncio.Future(loop = loop)
    transport = None
    try:
        transport, protocol = yield from loop.subprocess_exec(
            lambda : MyProtocol(done_future),
            "ls",
            "-lh",
            stdin = None
        )
        yield from done_future
    finally:
        if transport: transport.close()

    return done_future.result()

def run_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) # bind event loop to current thread

    try:
        return loop.run_until_complete(run(loop))
    finally:
        loop.close()

So here, I setup an asyncio event loop to execute the shell command ls -lh, and then trigger a callback for when data is received from the subprocess, and another callback for when the subprocess exits.

If I just call run_loop() directly in the main thread of a Python program, everything goes fine. But if I say:

t = threading.Thread(target = run_loop)
t.start()
t.join()

Then what happens is that the pipe_data_received() callback is invoked successfully, but process_exited() is never invoked, and the program just hangs.

After Googling around and looking at the asyncio source code for the implementation of unix_events.py, I discovered it might be necessary to manually attach my event loop to the global "child watcher" object, as follows:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)

Apparently, the child watcher is an (undocumented) object that is responsible for calling waitpid under the hood (or something like that). But when I tried this, and ran run_event_loop() in a background thread, I got the error:

  File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
    raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread

So here it looks like the implementation actually does a check to make sure that signal handlers can only be used on the main thread, leading me to believe that in the current implementation, using subprocess_exec on a background thread is in fact, simply impossible without changing the Python source code itself.

Am I correct about this? Sadly, the asyncio module is very under-documented, so it's hard for me to be confident about my conclusion here. I may simply be doing something wrong.

like image 464
Siler Avatar asked Jun 21 '17 19:06

Siler


People also ask

Does Asyncio use multiple threads?

Asyncio is a library to execute these coroutines in an asynchronous fashion using a concurrency model known as a single-threaded event loop.

How do you stop a running event loop?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.

Does Run_until_complete block?

run_until_complete is used to run a future until it's finished. It will block the execution of code following it. It does, however, cause the event loop to run. Any futures that have been scheduled will run until the future passed to run_until_complete is done.

Is Popen asynchronous?

Create a subprocess: low-level API using subprocess. Popen. Run subprocesses asynchronously using the subprocess module.


1 Answers

Handling subprocesses in a worker thread is fine as long as an asyncio loop is running in the main thread with its child watcher instanciated:

asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)

See this post and the documentation.

like image 100
Vincent Avatar answered Oct 16 '22 21:10

Vincent