So I'm using the Python asyncio
module (on Linux) to launch a child process and then asynchronously monitor it. My code works fine... when run on the main thread. But when I run it on a worker thread, it hangs, and the process_exited
callback is never invoked.
I suspect this may actually be some kind of undocumented defect or issue with running subprocess_exec
on a worker thread, likely having to do with how the implementation handles signals in a background thread. But it could also just be me screwing things up.
A simple, reproducible example is as follows:
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, done_future):
super().__init__()
self._done_future = done_future
def pipe_data_received(self, fd, data):
print("Received:", len(data))
def process_exited(self):
print("PROCESS EXITED!")
self._done_future.set_result(None)
def run(loop):
done_future = asyncio.Future(loop = loop)
transport = None
try:
transport, protocol = yield from loop.subprocess_exec(
lambda : MyProtocol(done_future),
"ls",
"-lh",
stdin = None
)
yield from done_future
finally:
if transport: transport.close()
return done_future.result()
def run_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
try:
return loop.run_until_complete(run(loop))
finally:
loop.close()
So here, I setup an asyncio
event loop to execute the shell command ls -lh
, and then trigger a callback for when data is received from the subprocess, and another callback for when the subprocess exits.
If I just call run_loop()
directly in the main thread of a Python program, everything goes fine. But if I say:
t = threading.Thread(target = run_loop)
t.start()
t.join()
Then what happens is that the pipe_data_received()
callback is invoked successfully, but process_exited()
is never invoked, and the program just hangs.
After Googling around and looking at the asyncio
source code for the implementation of unix_events.py
, I discovered it might be necessary to manually attach my event loop to the global "child watcher" object, as follows:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)
Apparently, the child watcher is an (undocumented) object that is responsible for calling waitpid
under the hood (or something like that). But when I tried this, and ran run_event_loop()
in a background thread, I got the error:
File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread
So here it looks like the implementation actually does a check to make sure that signal handlers can only be used on the main thread, leading me to believe that in the current implementation, using subprocess_exec
on a background thread is in fact, simply impossible without changing the Python source code itself.
Am I correct about this? Sadly, the asyncio
module is very under-documented, so it's hard for me to be confident about my conclusion here. I may simply be doing something wrong.
Asyncio is a library to execute these coroutines in an asynchronous fashion using a concurrency model known as a single-threaded event loop.
Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.
run_until_complete is used to run a future until it's finished. It will block the execution of code following it. It does, however, cause the event loop to run. Any futures that have been scheduled will run until the future passed to run_until_complete is done.
Create a subprocess: low-level API using subprocess. Popen. Run subprocesses asynchronously using the subprocess module.
Handling subprocesses in a worker thread is fine as long as an asyncio loop is running in the main thread with its child watcher instanciated:
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)
See this post and the documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With