Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python3 asyncio "Task was destroyed but it is pending" with some specific condition

Here is simplified code, which uses python3 coroutine and sets handler for SIGING and SIGTERM signals for stopping job properly:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import asyncio
import signal
import sys


def my_handler(signum, frame):
    print('Stopping')
    asyncio.get_event_loop().stop()
    # Do some staff
    sys.exit()


@asyncio.coroutine
def prob_ip(ip_addr):

    print('Ping ip:%s' % ip_addr)
    proc = yield from asyncio.create_subprocess_exec('ping', '-c', '3', ip_addr)
    ret_code = yield from proc.wait()
    if ret_code != 0:
        print("ip:%s doesn't responding" % ip_addr)
        # Do some staff
        yield from asyncio.sleep(2)
        # Do more staff
        yield from asyncio.sleep(16)


@asyncio.coroutine
def run_probing():

    print('Start probing')
    # Do some staff
    yield from asyncio.sleep(1)

    while True:
        yield from asyncio.wait([prob_ip('192.168.1.3'), prob_ip('192.168.1.2')])
        yield from asyncio.sleep(60)


def main():
    parser = argparse.ArgumentParser()
    parser.description = "Probing ip."
    parser.parse_args()

    signal.signal(signal.SIGINT, my_handler)
    signal.signal(signal.SIGTERM, my_handler)

    asyncio.get_event_loop().run_until_complete(run_probing())


if __name__ == '__main__':
    main()

When i run it via:

python3 test1.py

It stops by Ctrl-C without any warnings. But when I run it via:

python3 -m test1

It prints warning by Ctrl-C:

$ python3 -m test1 
Start probing
Ping ip:192.168.1.2
Ping ip:192.168.1.3
PING 192.168.1.2 (192.168.1.2): 56 data bytes
PING 192.168.1.3 (192.168.1.3): 56 data bytes
^C--- 192.168.1.2 ping statistics ---
--- 192.168.1.3 ping statistics ---
1 packets transmitted, 0 packets received, 100% packet loss
1 packets transmitted, 0 packets received, 100% packet loss
Stopping
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>
Task was destroyed but it is pending!
task: <Task pending coro=<prob_ip() running at /tmp/test1.py:22> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:394]>

Same warning I get if I install this script via:

from setuptools import setup

setup(name='some_scripts',
      version='1.0.0.0',
      author='Some Team',
      author_email='[email protected]',
      url='https://www.todo.ru',
      description='Some scripts',
      packages=['my_package'],
      entry_points={'console_scripts': [
          'test1=my_package.test1:main',
      ]},
      )

My python version is "3.4.2"

like image 238
willir Avatar asked Nov 03 '15 17:11

willir


2 Answers

Ok. I think I have figured out how should I stop all tasks.

  1. First of all, as far as I understand. BaseEventLoop.stop() is only to stop BaseEventLoop.run_forever(). So one should cancel all tasks via Future.cancel. To get all tasks you can use Task.all_tasks static method.
  2. After cancellation all tasks asyncio.CancelledError exception will be raised from run_until_complete. So one should catch it, if one doesn't want to print it to stderr.
  3. And also, in some cases, I get this error: TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object. I found some topics about this error:

    • https://bugs.python.org/issue23548
    • https://github.com/buildinspace/peru/issues/98

    They all say that it can be fixed by closing loop before exiting application.

So we get this code:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import signal


def my_handler():
    print('Stopping')
    for task in asyncio.Task.all_tasks():
        task.cancel()


@asyncio.coroutine
def do_some(some_args):
    while True:
        print("Do staff with %s" % some_args)
        yield from asyncio.sleep(2)


def main():
    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGINT, my_handler)

    try:
        loop.run_until_complete(asyncio.wait([do_some(1), do_some(2)]))
    except asyncio.CancelledError:
        print('Tasks has been canceled')
    finally:
        loop.close()


if __name__ == '__main__':
    main()

It works also with signal.signal. But as Vincent noticed loop.add_signal_handler looks better in this case.

But I am still not sure is this the best way to stop all tasks.

like image 197
willir Avatar answered Sep 30 '22 20:09

willir


Use asyncio.gather instead of asyncio.wait:

Cancellation: if the outer Future is cancelled, all children (that have not completed yet) are also cancelled.

Example:

def handler(future, loop):
    future.cancel()
    loop.stop()

@asyncio.coroutine
def do_some(arg):
    while True:
        print("Do stuff with %s" % arg)
        yield from asyncio.sleep(2)

loop = asyncio.get_event_loop()
future = asyncio.gather(do_some(1), do_some(2))
loop.add_signal_handler(signal.SIGINT, handler, future, loop)
loop.run_forever()
like image 31
Vincent Avatar answered Sep 30 '22 20:09

Vincent