Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to catch concurrent.futures._base.TimeoutError correctly when using asyncio.wait_for and asyncio.Semaphore?

First of all, i need to warn you: I'm new to asyncio, and i h I warn you right away, I'm new to asyncio, and I can hardly imagine what is in the library under the hood.

Here is my code:

import asyncio

semaphore = asyncio.Semaphore(50)

async def work(value):
    async with semaphore:
        print(value)
        await asyncio.sleep(10)

async def main():
    tasks = []
    for i in range(0, 10000):
        tasks.append(asyncio.wait_for(work(i), timeout=3))
    await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)

What i need: Coroutine work() to be completed for no more than 3 seconds, and no more than 50 pieces at a same time. After 3 seconds (timeout), the coroutine work() must stop execution, and a new 50 tasks must start to work. But in my case, after 3 seconds crashes:

Traceback (most recent call last):
  File "C:/Users/root/PycharmProjects/LogParser/ssh/async/asyn_test.py", line 19, in <module>
    loop.run_until_complete(future)
  File "C:\Code\Python3\lib\asyncio\base_events.py", line 579, in run_until_complete
    return future.result()
  File "C:/Users/root/PycharmProjects/LogParser/ssh/async/asyn_test.py", line 15, in main
    await asyncio.gather(*tasks)
  File "C:\Code\Python3\lib\asyncio\tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

Whatever I had not tried to catch this exception, no matter how many tasks remained, program breaks down. I need, upon reaching timeout, the program continued to the next tasks Please, teach me, how do I need properly implement this?

Python 3.7, asyncio 3.4.3

like image 700
Roman Isakov Avatar asked May 20 '20 09:05

Roman Isakov


1 Answers

You need to handle the exception. If you just pass it to gather, it will re-raise it. For example, you can create a new coroutine with the appropriate try/except:

semaphore = asyncio.Semaphore(50)

async def work(value):
    print(value)
    await asyncio.sleep(10)

async def work_with_timeout(value):
    async with semaphore:
        try:
            return await asyncio.wait_for(work(value), timeout=3)
        except asyncio.TimeoutError:
            return None

async def main():
    tasks = []
    for i in range(0, 10000):
        tasks.append(work_with_timeout(i))
    await asyncio.gather(*tasks)
like image 173
user4815162342 Avatar answered Nov 17 '22 17:11

user4815162342