I have two problems with concurrent.futures:
Conclusion: time.sleep() cannot be interrupted. One solution is: You can write a loop around it and do short sleeps.
See How to break time.sleep() in a python concurrent.futures
Conclusion: individual timeouts need to implemented by the user. For example: for each timeout you can call to wait().
See Individual timeouts for concurrent.futures
Does asyncio solve theses problems?
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
class asyncio. Future(*, loop=None) A Future represents an eventual result of an asynchronous operation. Not thread-safe. Future is an awaitable object.
The concurrent.futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
In the asyncio model, execution is scheduled and coordinated by an event loop. To cancel execution of a currently suspended task, you essentially simply have to not resume it. While this works a little different in practice, it should be obvious that this makes cancelling a suspended task simple in theory.
Individual timeouts are certainly possible the same way: whenever you suspend a coroutine to wait for a result, you want to supply a timeout value. The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.
Some concrete samples:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
...
concurrent.futures._base.CancelledError
In practice, this might be implemented using something like this:
class Foo:
task = None
async def sleeper(self):
self.task = asyncio.sleep(60)
try:
await self.task
except concurrent.futures.CancelledError:
raise NotImplementedError
While this method is asleep, somebody else can call foo.task.cancel()
to wake up the coroutine and let it handle the cancellation. Alternatively whoever calls sleeper()
can cancel it directly without giving it a chance to clean up.
Setting timeouts is similarly easy:
>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
...
concurrent.futures._base.TimeoutError
Particularly in the context of HTTP request timeouts, see aiohttp:
async def fetch_page(session, url):
with aiohttp.Timeout(10):
async with session.get(url) as response:
assert response.status == 200
return await response.read()
with aiohttp.ClientSession(loop=loop) as session:
content = loop.run_until_complete(fetch_page(session, 'http://python.org'))
Obviously each call to fetch_page
can decide on its own aiohttp.Timeout
value, and each individual instance will throw its own exception when that timeout is reached.
You can raise immediately in its exception (with asyncio.CancelledError
).
I use this method for the catch to overcome it:
import asyncio
async def worker():
try:
# await for some coroutine process
except asyncio.CancelledError:
# Do stuff
raise asyncio.CancelledError()
except Exception as exc:
# Do stuff
print(exc)
finally:
await asyncio.sleep(2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With