Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly use asyncio.FIRST_COMPLETED

The problem is that I keep getting RuntimeError: Event loop is closed error even when I use return_when=asyncio.FIRST_COMPLETED inside await asyncio.wait().

My code:

async def task_manager():
    tasks = [grab_proxy() for _ in range(10)]
    finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    for x in finished:
        result = x.result()

        if result:
            return result


def get_proxy_loop():
    loop = asyncio.new_event_loop()

    proxy = loop.run_until_complete(task_manager())

    loop.close()
    return proxy


if __name__ == '__main__':
    p = get_proxy_loop()

    print(type(p))
    print(p)

Expected behaviour:

return_when=asyncio.FIRST_COMPLETED should kill all remaining tasks when first result is returned "under the hood".

But in fact there still remain uncompleted tasks after first result returned. And after I close the loop in get_proxy_loop() and access result inside __main__ those remaing tasks raise RuntimeError: Event loop is closed.

Console output:

<class 'str'>
78.32.35.21:55075
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() running at /home/pata/PycharmProjects/accs_farm/accs_farm/proxy_grabber.py:187> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8798>()]>>
Exception ignored in: <coroutine object grab_proxy at 0x7fc5150aae60>
Traceback (most recent call last):
  File "/home/pata/proxy_grabber.py", line 187, in grab_proxy
    proxy = await async_get_proxy()
  File "/home/pata/proxy_grabber.py", line 138, in async_get_proxy
    async with session.get(provider_url, timeout=5, params=params) as r:
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 855, in __aenter__
    self._resp = await self._coro
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 396, in _request
    conn.close()
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 110, in close
    self._key, self._protocol, should_close=True)
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 547, in _release
Event loop is closed
    transport = protocol.close()
  File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client_proto.py", line 54, in close
    transport.close()
  File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.6/asyncio/base_events.py", line 580, in call_soon
    self._check_closed()
  File "/usr/lib/python3.6/asyncio/base_events.py", line 366, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
...
...
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /home/pata/proxy_grabber.py:183> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(11)(), <TaskWakeupMethWrapper object at 0x7fc514d15e28>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /proxy_grabber.py:183> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8588>()]>>
Event loop is closed
Process finished with exit code 0
like image 258
PATAPOsha Avatar asked Feb 20 '19 13:02

PATAPOsha


Video Answer


1 Answers

The asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED) coroutine returns when at least one of the tasks has completed. Other tasks can still be active. It is not the job of asyncio.wait() to cancel those tasks for you. The use-case of asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED) is to let you monitor tasks and act on their results as they complete; you usually would call it repeatedly until all your tasks are finished.

From the asyncio.wait() documentation:

Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.

[...]

return_when indicates when this function should return. It must be one of the following constants:

FIRST_COMPLETED
The function will return when any future finishes or is cancelled.

[...]

Unlike wait_for(), wait() does not cancel the futures when a timeout occurs.

The documentation explicitly states that it will not cancel futures, even when you set a timeout (if you do set a timeout, then the first done set is simply empty, the tasks are all still active and listed in the second pending set).

If you need the unfinished tasks to be cancelled, do so explicitly:

while tasks:
    finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    for x in finished:
        result = x.result()

        if result:
            # cancel the other tasks, we have a result. We need to wait for the cancellations
            # to propagate.
            for task in unfinished:
                task.cancel()
            await asyncio.wait(unfinished)
            return result

    tasks = unfinished

Demo with some extra printing and randomised tasks:

>>> import asyncio
>>> import random
>>> async def grab_proxy(taskid):
...     await asyncio.sleep(random.uniform(0.1, 1))
...     result = random.choice([None, None, None, 'result'])
...     print(f'Task #{taskid} producing result {result!r}')
...     return result
...
>>> async def task_manager():
...     tasks = [grab_proxy(i) for i in range(10)]
...     while tasks:
...         finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
...         for x in finished:
...             result = x.result()
...             print(f"Finished task produced {result!r}")
...             if result:
...                 # cancel the other tasks, we have a result. We need to wait for the cancellations
...                 # to propagate.
...                 print(f"Cancelling {len(unfinished)} remaining tasks")
...                 for task in unfinished:
...                     task.cancel()
...                 await asyncio.wait(unfinished)
...                 return result
...         tasks = unfinished
...
>>>
>>> def get_proxy_loop():
...     loop = asyncio.new_event_loop()
...     proxy = loop.run_until_complete(task_manager())
...     loop.close()
...     return proxy
...
>>> get_proxy_loop()
Task #7 producing result None
Finished task produced None
Task #0 producing result 'result'
Finished task produced 'result'
Cancelling 8 remaining tasks
'result'
like image 122
Martijn Pieters Avatar answered Sep 20 '22 13:09

Martijn Pieters