I have a situation where the subprocess communicate hangs when i have to run the subprocess inside an asyncio event loop, and the whole thing is inside a separate thread.
I learned that in order to run subprocess in separate thread, i need to have
1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.
After having the above conditions i got my subprocess work. But the subprocess.communicate is hanging now. The same code is working if call it from the main thread.
After digging further i observed that communicate is hanging because the process is not finishing on its own. ie await process.wait()
is actually hanging.
I have seen communicate hang when the command i am trying to issue in subprocess itself hangs, but that is not the case here.
import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
loop.run_until_complete(asyncio.gather(*tasks))
async def myfunc3():
t = threading.Thread(target=myfunc2)
t.start()
t.join()
def main():
asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(myfunc3()))
loop.close()
main()
I think this fixes it. Use loop run_in_executor for the Threads.
import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
cmdseq = tuple(shlex.split(cmd))
print(cmd)
p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(p.pid)
output = (await asyncio.wait_for(p.communicate(), 5))[0]
output = output.decode('utf8')
print(output)
return output
async def myfunc(cmd):
o = await sendcmd(cmd)
return o
def myfunc2():
thread_loop = asyncio.new_event_loop()
asyncio.set_event_loop(thread_loop)
thread_loop.set_debug(True)
tasks = []
tasks.append(asyncio.ensure_future(myfunc('uname -a')))
thread_loop.run_until_complete(asyncio.gather(*tasks))
thread_loop.close()
async def myfunc3(loop=None):
await loop.run_in_executor(None, myfunc2)
def main():
logfilename='test.log'
print('Writing log to {}'.format(logfilename))
logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
logging.getLogger('asyncio').setLevel(logging.DEBUG)
root = logging.getLogger(__name__)
cw=asyncio.get_child_watcher()
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
cw.close()
main_loop.close()
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With