The problem is that I keep getting RuntimeError: Event loop is closed
error even when I use return_when=asyncio.FIRST_COMPLETED
inside await asyncio.wait()
.
My code:
async def task_manager():
tasks = [grab_proxy() for _ in range(10)]
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
return result
def get_proxy_loop():
loop = asyncio.new_event_loop()
proxy = loop.run_until_complete(task_manager())
loop.close()
return proxy
if __name__ == '__main__':
p = get_proxy_loop()
print(type(p))
print(p)
Expected behaviour:
return_when=asyncio.FIRST_COMPLETED
should kill all remaining tasks when first result is returned "under the hood".
But in fact there still remain uncompleted tasks after first result returned. And after I close the loop in get_proxy_loop()
and access result inside __main__
those remaing tasks raise RuntimeError: Event loop is closed
.
Console output:
<class 'str'>
78.32.35.21:55075
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() running at /home/pata/PycharmProjects/accs_farm/accs_farm/proxy_grabber.py:187> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8798>()]>>
Exception ignored in: <coroutine object grab_proxy at 0x7fc5150aae60>
Traceback (most recent call last):
File "/home/pata/proxy_grabber.py", line 187, in grab_proxy
proxy = await async_get_proxy()
File "/home/pata/proxy_grabber.py", line 138, in async_get_proxy
async with session.get(provider_url, timeout=5, params=params) as r:
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 855, in __aenter__
self._resp = await self._coro
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client.py", line 396, in _request
conn.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 110, in close
self._key, self._protocol, should_close=True)
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/connector.py", line 547, in _release
Event loop is closed
transport = protocol.close()
File "/home/pata/venvs/test_celery/lib/python3.6/site-packages/aiohttp/client_proto.py", line 54, in close
transport.close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib/python3.6/asyncio/base_events.py", line 580, in call_soon
self._check_closed()
File "/usr/lib/python3.6/asyncio/base_events.py", line 366, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
...
...
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /home/pata/proxy_grabber.py:183> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(11)(), <TaskWakeupMethWrapper object at 0x7fc514d15e28>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<grab_proxy() done, defined at /proxy_grabber.py:183> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc5187a8588>()]>>
Event loop is closed
Process finished with exit code 0
The asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
coroutine returns when at least one of the tasks has completed. Other tasks can still be active. It is not the job of asyncio.wait()
to cancel those tasks for you. The use-case of asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)
is to let you monitor tasks and act on their results as they complete; you usually would call it repeatedly until all your tasks are finished.
From the asyncio.wait()
documentation:
Run awaitable objects in the aws set concurrently and block until the condition specified by return_when.
[...]
return_when indicates when this function should return. It must be one of the following constants:
FIRST_COMPLETED
The function will return when any future finishes or is cancelled.[...]
Unlike
wait_for()
,wait()
does not cancel the futures when a timeout occurs.
The documentation explicitly states that it will not cancel futures, even when you set a timeout (if you do set a timeout, then the first done set is simply empty, the tasks are all still active and listed in the second pending set).
If you need the unfinished tasks to be cancelled, do so explicitly:
while tasks:
finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for x in finished:
result = x.result()
if result:
# cancel the other tasks, we have a result. We need to wait for the cancellations
# to propagate.
for task in unfinished:
task.cancel()
await asyncio.wait(unfinished)
return result
tasks = unfinished
Demo with some extra printing and randomised tasks:
>>> import asyncio
>>> import random
>>> async def grab_proxy(taskid):
... await asyncio.sleep(random.uniform(0.1, 1))
... result = random.choice([None, None, None, 'result'])
... print(f'Task #{taskid} producing result {result!r}')
... return result
...
>>> async def task_manager():
... tasks = [grab_proxy(i) for i in range(10)]
... while tasks:
... finished, unfinished = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
... for x in finished:
... result = x.result()
... print(f"Finished task produced {result!r}")
... if result:
... # cancel the other tasks, we have a result. We need to wait for the cancellations
... # to propagate.
... print(f"Cancelling {len(unfinished)} remaining tasks")
... for task in unfinished:
... task.cancel()
... await asyncio.wait(unfinished)
... return result
... tasks = unfinished
...
>>>
>>> def get_proxy_loop():
... loop = asyncio.new_event_loop()
... proxy = loop.run_until_complete(task_manager())
... loop.close()
... return proxy
...
>>> get_proxy_loop()
Task #7 producing result None
Finished task produced None
Task #0 producing result 'result'
Finished task produced 'result'
Cancelling 8 remaining tasks
'result'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With