Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does asyncio subprocess.communicate hang when called in different thread?

I have a situation where the subprocess communicate hangs when i have to run the subprocess inside an asyncio event loop, and the whole thing is inside a separate thread.

I learned that in order to run subprocess in separate thread, i need to have

1. an event loop running in main thread, and
2. a child watcher must be initiated in main thread.

After having the above conditions i got my subprocess work. But the subprocess.communicate is hanging now. The same code is working if call it from the main thread.

After digging further i observed that communicate is hanging because the process is not finishing on its own. ie await process.wait() is actually hanging.

I have seen communicate hang when the command i am trying to issue in subprocess itself hangs, but that is not the case here.

import asyncio
import shlex
import threading
import subprocess
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output


async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    loop.run_until_complete(asyncio.gather(*tasks))

async def myfunc3():
    t = threading.Thread(target=myfunc2)
    t.start()
    t.join()

def main():
    asyncio.get_child_watcher()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.ensure_future(myfunc3()))
    loop.close()

main()
like image 387
subin becker Avatar asked Dec 21 '18 10:12

subin becker


1 Answers

I think this fixes it. Use loop run_in_executor for the Threads.

import asyncio
import shlex
import threading
import subprocess
import logging
async def sendcmd(cmd):
    cmdseq = tuple(shlex.split(cmd))
    print(cmd)
    p = await asyncio.create_subprocess_exec(*cmdseq, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    print(p.pid)
    output = (await asyncio.wait_for(p.communicate(), 5))[0]
    output = output.decode('utf8')
    print(output)
    return output

async def myfunc(cmd):
    o = await sendcmd(cmd)
    return o

def myfunc2():
    thread_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(thread_loop)
    thread_loop.set_debug(True)     
    tasks = []
    tasks.append(asyncio.ensure_future(myfunc('uname -a')))
    thread_loop.run_until_complete(asyncio.gather(*tasks))
    thread_loop.close()

async def myfunc3(loop=None):
    await loop.run_in_executor(None, myfunc2)    

def main():
    logfilename='test.log'
    print('Writing log to {}'.format(logfilename))
    logging.basicConfig(filename=logfilename, level=logging.INFO, format='%(asctime)s %(name)s %(module)s %(levelname)-8s %(message)s')
    logging.getLogger('asyncio').setLevel(logging.DEBUG)
    root = logging.getLogger(__name__)

    cw=asyncio.get_child_watcher()
    main_loop = asyncio.get_event_loop()
    main_loop.run_until_complete(asyncio.ensure_future(myfunc3(loop=main_loop)))
    cw.close()
    main_loop.close()

main()
like image 65
rjmcmahon Avatar answered Nov 04 '22 06:11

rjmcmahon