Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python3 asyncio: wait_for() communicate() with timeout, how to get partial result?

The Python docs about asyncio - Subprocess say:

The communicate() and wait() methods don’t take a timeout parameter: use the wait_for() function

It's pretty easy to impose a timeout on communicate() using wait_for(), however I can't find a way to retrieve the partial results from the interrupted communicate() call, and subsequent calls to communicate() doesn't return the lost part either.

Example script:

#! /usr/bin/env python3

import asyncio

async def communicate_short(loop):
    p = await asyncio.create_subprocess_exec('ping', '127.0.0.1', '-n', '4', stdout=asyncio.subprocess.PIPE)
    # For Linux: use '-c' instead of '-n'

    try:
        # 2 seconds timeout
        res = await asyncio.wait_for(p.communicate(), 2)
    except asyncio.TimeoutError as e:
        # After timeout happens:
        # How do I get the subprocess's STDOUT up to this point?
        try:
            print(res[0].decode('utf-8'))
            # Will raise NameError since the communicate() call did not complete
        except NameError as e:
            print('NameError: %s' % e)


    res = await p.communicate()
    print(res[0].decode('utf-8'))
    # Only prints the later half of ping's STDOUT

if __name__ == '__main__':
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
    # For Linux: just do loop = asyncio.get_event_loop()

    loop.run_until_complete(communicate_short(loop))

Output from the example script:

NameError: local variable 'res' referenced before assignment
Reply from 127.0.0.1: bytes=32 time<1ms TTL=128
Reply from 127.0.0.1: bytes=32 time<1ms TTL=128

Ping statistics for 127.0.0.1:
    Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
    Minimum = 0ms, Maximum = 0ms, Average = 0ms

Notice that only the last 2 packets are printed. Output about the first 2 packets are lost.

So, how should I get the output from the subprocess before the timeout happened?

Edit: To be more precise, ideally what I'm looking for something that:

  1. Does what communicate() does, i.e. asynchronously write to a subprocess's STDIN and read its STDOUT and STDERR, without possibility of deadlocking (that the docs ominously warn about);

  2. has a configurable total timeout, so that when either the subprocess terminates or the timeout is reached, the received STDOUT and STDERR so far are returned.

Looks like such a thing does not exist yet, and one would have to implement it.

like image 480
twisteroid ambassador Avatar asked Mar 07 '17 04:03

twisteroid ambassador


2 Answers

For the second part of your question, "how should I get the output from the subprocess before the timeout happened?" I would suggest using asyncio.wait() which does not cancel the task (p.communicate()) instead of asyncio.wait_for() (which cancels the task):

task = asyncio.Task(p.communicate())
done, pending = await asyncio.wait([task], timeout=2)
if pending:
    print("timeout!", task._state)
res = await task  # Note: It is OK to await a task more than once
print(res[0].decode())

Regarding "retrieve the partial results" , I would suggest not to use communicate() which calls stdout.read() and use a different approach:

import asyncio


async def ping(loop, host):
    p = await asyncio.create_subprocess_exec(
        'ping', host, '-c', '4',
        stdout=asyncio.subprocess.PIPE, loop=loop)

    async for line in p.stdout:
        print(host, "==>", line.decode(), end="")

    print(host, "done")


if __name__ == '__main__':
    loop = loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [
        ping(loop, '8.8.8.8'),
        ping(loop, '127.0.0.1'),
        ping(loop, 'example.com'),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Combining the two solutions (and using readline() instead of the much cooler async for) gives:

import asyncio


async def ping(loop, host):
    p = await asyncio.create_subprocess_exec(
        'ping', host, '-c', '10',
        stdout=asyncio.subprocess.PIPE, loop=loop)

    n = 0
    while True:
        n += 1
        task = asyncio.Task(p.stdout.readline())
        done, pending = await asyncio.wait([task], timeout=1)
        if not done:
            print(host, n, "==>", "Timeout!")
        line = await task
        if not line:
            break
        print(host, n, "==>", line.decode(), end="")

    print(host, "==>", "done")


if __name__ == '__main__':
    loop = loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    tasks = [
        # ping(loop, '8.8.8.8'),
        # ping(loop, '127.0.0.1'),
        ping(loop, 'example.com'),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

Notice the timeout (1 second) is per line.

See also: https://github.com/aio-libs/async-timeout

like image 185
Udi Avatar answered Nov 11 '22 02:11

Udi


If killing subprocess after timeout was expected, you could get the partial output like this:

future = asyncio.ensure_future(p.communicate())
done, pending = await asyncio.wait([future], timeout=2)
if pending:
    # timeout
    if p.returncode is None:
        # kill the subprocess, then `await future` will return soon
        try:
            p.kill()
        except ProcessLookupError:
            pass
output, err = await future
print(output.decode('utf-8'))
like image 32
Gnuth Avatar answered Nov 11 '22 02:11

Gnuth