Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connect two processes started with asyncio.subprocess.create_subprocess_exec()

When starting two processes with the old school subprocess.Popen() API, I can easily connect standard out of one process to standard in of another process, creating a pipeline in the same way as a UNIX shell will do when connecting commands with |:

from subprocess import Popen, PIPE

process_1 = Popen(['ls'], stdout = PIPE)
process_2 = Popen(['wc'], stdin = process_1.stdout)

process_1.wait()
process_2.wait()

How can I accomplish the same when using the asynchronous API from asyncio.subprocess.create_subprocess_exec() (or similar)? This is what I tried:

from asyncio.events import get_event_loop
from asyncio.subprocess import PIPE, create_subprocess_exec

async def main():
    process_1 = await create_subprocess_exec('ls', stdout = PIPE)
    process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)

    await process_1.wait()
    await process_2.wait()

get_event_loop().run_until_complete(main())

But the second call to create_subprocess_exec() complains that the argument passed to stdin has no fileno (which is true):

Traceback (most recent call last):
  File ".../test-async.py", line 11, in <module>
     get_event_loop().run_until_complete(main())
[...]
  File ".../test-async.py", line 6, in main
     process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)
[...]
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/subprocess.py", line 1388, in _get_handles
     p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

How can I get the same result as in the synchronous example above?

like image 788
Feuermurmel Avatar asked Apr 15 '16 22:04

Feuermurmel


1 Answers

In asyncio, process.stdout is actually a StreamReader, not a file object. The file object can be accessed through process._transport._proc.stdout. Unfortunately, you won't be able to use it since it has already been registered in the event loop in order to provide the stream interface process.stdout.

One way to deal with the issue is to create your own pipe and pass the file descriptors to the subprocess:

async def main():
    read, write = os.pipe()
    process_1 = await create_subprocess_exec('ls', stdout=write)
    os.close(write)
    process_2 = await create_subprocess_exec('wc', stdin=read, stdout=PIPE)
    os.close(read)
    return await process_2.stdout.read()

Note that the write file descriptor should be explicitly closed once the first subprocess is started (it is not automatically closed unless you use subprocess.PIPE). The read file descriptor also needs to be closed, as explained here.

like image 176
Vincent Avatar answered Sep 22 '22 13:09

Vincent